Add tests and testdata for S3 backend

This commit is contained in:
2024-02-19 09:14:53 +01:00
parent 03a496d477
commit 256d1b85f6
8 changed files with 916 additions and 27 deletions

View File

@@ -227,6 +227,94 @@ func (s3a *S3Archive) LoadClusterCfg(name string) (*schema.Cluster, error) {
return DecodeCluster(r)
}
func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer {
ch := make(chan JobContainer)
go func() {
clusterDirs := s3a.client.ListObjects(context.Background(), s3a.bucket, minio.ListObjectsOptions{Recursive: false})
for clusterDir := range clusterDirs {
if clusterDir.Err != nil {
fmt.Println(clusterDir.Err)
return
}
fmt.Println(clusterDir.Key)
if clusterDir.Size != 0 {
continue
}
key := filepath.Join("", clusterDir.Key)
fmt.Println(key)
lvl1Dirs := s3a.client.ListObjects(context.Background(), s3a.bucket, minio.ListObjectsOptions{Recursive: false, Prefix: key})
for lvl1Dir := range lvl1Dirs {
fmt.Println(lvl1Dir.Key)
ch <- JobContainer{Meta: nil, Data: nil}
}
//
// for _, lvl1Dir := range lvl1Dirs {
// if !lvl1Dir.IsDir() {
// // Could be the cluster.json file
// continue
// }
//
// lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name()))
// if err != nil {
// log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
// }
//
// for _, lvl2Dir := range lvl2Dirs {
// dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name())
// startTimeDirs, err := os.ReadDir(dirpath)
// if err != nil {
// log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
// }
//
// for _, startTimeDir := range startTimeDirs {
// if startTimeDir.IsDir() {
// b, err := os.ReadFile(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
// if err != nil {
// log.Errorf("loadJobMeta() > open file error: %v", err)
// }
// job, err := loadJobMeta(b)
// if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
// log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
// }
//
// if loadMetricData {
// var isCompressed bool = true
// filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz")
//
// if !util.CheckFileExists(filename) {
// filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json")
// isCompressed = false
// }
//
// f, err := os.Open(filename)
// if err != nil {
// log.Errorf("fsBackend LoadJobData()- %v", err)
// }
// defer f.Close()
//
// data, err := loadJobData(f, filename, isCompressed)
// if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
// log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
// }
// ch <- JobContainer{Meta: job, Data: &data}
// log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
// } else {
// ch <- JobContainer{Meta: job, Data: nil}
// }
// }
// }
// }
// }
}
close(ch)
}()
return ch
}
func (s3a *S3Archive) ImportJob(
jobMeta *schema.JobMeta,
jobData *schema.JobData,
@@ -239,36 +327,37 @@ func (s3a *S3Archive) ImportJob(
r, w := io.Pipe()
if err := EncodeJobMeta(w, jobMeta); err != nil {
log.Error("Error while encoding job metadata to meta.json file")
return err
}
go func() {
defer w.Close()
if err := EncodeJobMeta(w, jobMeta); err != nil {
log.Error("Error while encoding job metadata to meta.json object")
}
}()
key := getPath(&job, "./", "meta.json")
s3a.client.PutObject(context.Background(),
s3a.bucket, key, r,
int64(unsafe.Sizeof(job)), minio.PutObjectOptions{})
_, e := s3a.client.PutObject(context.Background(),
s3a.bucket, key, r, -1, minio.PutObjectOptions{})
if err := w.Close(); err != nil {
log.Warn("Error while closing meta.json file")
return err
if e != nil {
log.Errorf("Put error %#v", e)
return e
}
r, w = io.Pipe()
//
// f, err = os.Create(path.Join(dir, "data.json"))
// if err != nil {
// log.Error("Error while creating filepath for data.json")
// return err
// }
// if err := EncodeJobData(f, jobData); err != nil {
// log.Error("Error while encoding job metricdata to data.json file")
// return err
// }
// if err := f.Close(); err != nil {
// log.Warn("Error while closing data.json file")
// }
// return err
//
go func() {
defer w.Close()
if err := EncodeJobData(w, jobData); err != nil {
log.Error("Error while encoding job metricdata to data.json object")
}
}()
key = getPath(&job, "./", "data.json")
_, e = s3a.client.PutObject(context.Background(),
s3a.bucket, key, r, -1, minio.PutObjectOptions{})
if e != nil {
log.Errorf("Put error %#v", e)
return e
}
return nil
}
@@ -315,4 +404,3 @@ func (s3a *S3Archive) GetClusters() []string {
//
// func (s3a *S3Archive) CompressLast(starttime int64) int64
//
// func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer