Merge branch 'master' into import-data-sanitation

This commit is contained in:
Jan Eitzinger
2023-04-07 08:57:42 +02:00
110 changed files with 8191 additions and 1464 deletions

View File

@@ -55,7 +55,7 @@ func loadJobMeta(filename string) (*schema.JobMeta, error) {
b, err := os.ReadFile(filename)
if err != nil {
log.Errorf("fsBackend loadJobMeta()- %v", err)
log.Errorf("loadJobMeta() > open file error: %v", err)
return &schema.JobMeta{}, err
}
if config.Keys.Validate {
@@ -105,12 +105,12 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (int, error) {
var config FsArchiveConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
log.Errorf("fsBackend Init()- %v", err)
log.Warnf("Init() > Unmarshal error: %#v", err)
return 0, err
}
if config.Path == "" {
err := fmt.Errorf("fsBackend Init()- empty path")
log.Errorf("fsBackend Init()- %v", err)
err := fmt.Errorf("Init() : empty config.Path")
log.Errorf("Init() > config.Path error: %v", err)
return 0, err
}
fsa.path = config.Path
@@ -133,7 +133,7 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (int, error) {
entries, err := os.ReadDir(fsa.path)
if err != nil {
log.Errorf("fsBackend Init()- %v", err)
log.Errorf("Init() > ReadDir() error: %v", err)
return 0, err
}
@@ -150,10 +150,10 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (int, error) {
func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
var isCompressed bool = true
filename := getPath(job, fsa.path, "data.json.gz")
if !checkFileExists(filename) {
filename = getPath(job, fsa.path, "data.json")
isCompressed = false
return nil, err
}
return loadJobData(filename, isCompressed)
@@ -169,12 +169,13 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil {
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
log.Errorf("LoadClusterCfg() > open file error: %v", err)
return &schema.Cluster{}, err
}
// if config.Keys.Validate {
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
// if config.Keys.Validate {
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
log.Warnf("Validate cluster config: %v\n", err)
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
}
}
// }
return DecodeCluster(bytes.NewReader(b))
@@ -186,7 +187,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
go func() {
clustersDir, err := os.ReadDir(fsa.path)
if err != nil {
log.Fatalf("Reading clusters failed: %s", err.Error())
log.Fatalf("Reading clusters failed @ cluster dirs: %s", err.Error())
}
for _, clusterDir := range clustersDir {
@@ -195,7 +196,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
}
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
}
for _, lvl1Dir := range lvl1Dirs {
@@ -206,19 +207,18 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
}
for _, lvl2Dir := range lvl2Dirs {
dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name())
startTimeDirs, err := os.ReadDir(dirpath)
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
}
for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() {
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
@@ -237,6 +237,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
ch <- JobContainer{Meta: job, Data: &data}
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
} else {
ch <- JobContainer{Meta: job, Data: nil}
}
@@ -259,12 +260,15 @@ func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error {
}
f, err := os.Create(getPath(&job, fsa.path, "meta.json"))
if err != nil {
log.Error("Error while creating filepath for meta.json")
return err
}
if err := EncodeJobMeta(f, jobMeta); err != nil {
log.Error("Error while encoding job metadata to meta.json file")
return err
}
if err := f.Close(); err != nil {
log.Warn("Error while closing meta.json file")
return err
}
@@ -287,17 +291,21 @@ func (fsa *FsArchive) ImportJob(
}
dir := getPath(&job, fsa.path, "")
if err := os.MkdirAll(dir, 0777); err != nil {
log.Error("Error while creating job archive path")
return err
}
f, err := os.Create(path.Join(dir, "meta.json"))
if err != nil {
log.Error("Error while creating filepath for meta.json")
return err
}
if err := EncodeJobMeta(f, jobMeta); err != nil {
log.Error("Error while encoding job metadata to meta.json file")
return err
}
if err := f.Close(); err != nil {
log.Warn("Error while closing meta.json file")
return err
}
@@ -325,11 +333,17 @@ func (fsa *FsArchive) ImportJob(
f, err = os.Create(path.Join(dir, "data.json"))
if err != nil {
log.Error("Error while creating filepath for data.json")
return err
}
if err := EncodeJobData(f, jobData); err != nil {
log.Error("Error while encoding job metricdata to data.json file")
return err
}
if err := f.Close(); err != nil {
log.Warn("Error while closing data.json file")
return err
}
return f.Close()
}