Add S3 loadJobMeta

This commit is contained in:
2024-01-25 15:43:06 +01:00
parent 578a270932
commit 3bb5ae696a
6 changed files with 267 additions and 89 deletions

View File

@@ -59,19 +59,13 @@ func getDirectory(
func getPath(
job *schema.Job,
rootPath string,
file string) string {
file string,
) string {
return filepath.Join(
getDirectory(job, rootPath), file)
}
func loadJobMeta(filename string) (*schema.JobMeta, error) {
b, err := os.ReadFile(filename)
if err != nil {
log.Errorf("loadJobMeta() > open file error: %v", err)
return &schema.JobMeta{}, err
}
func loadJobMeta(b []byte) (*schema.JobMeta, error) {
if config.Keys.Validate {
if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil {
return &schema.JobMeta{}, fmt.Errorf("validate job meta: %v", err)
@@ -83,7 +77,6 @@ func loadJobMeta(filename string) (*schema.JobMeta, error) {
func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err
@@ -116,7 +109,6 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
}
func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
var config FsArchiveConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
log.Warnf("Init() > Unmarshal error: %#v", err)
@@ -242,7 +234,6 @@ func (fsa *FsArchive) Exists(job *schema.Job) bool {
}
func (fsa *FsArchive) Clean(before int64, after int64) {
if after == 0 {
after = math.MaxInt64
}
@@ -358,7 +349,6 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) {
}
func (fsa *FsArchive) CompressLast(starttime int64) int64 {
filename := filepath.Join(fsa.path, "compress.txt")
b, err := os.ReadFile(filename)
if err != nil {
@@ -391,11 +381,15 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
filename := getPath(job, fsa.path, "meta.json")
return loadJobMeta(filename)
b, err := os.ReadFile(filename)
if err != nil {
log.Errorf("loadJobMeta() > open file error: %v", err)
return &schema.JobMeta{}, err
}
return loadJobMeta(b)
}
func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil {
log.Errorf("LoadClusterCfg() > open file error: %v", err)
@@ -410,7 +404,6 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
}
func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
ch := make(chan JobContainer)
go func() {
clustersDir, err := os.ReadDir(fsa.path)
@@ -447,7 +440,11 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() {
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
b, err := os.ReadFile(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
if err != nil {
log.Errorf("loadJobMeta() > open file error: %v", err)
}
job, err := loadJobMeta(b)
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
@@ -481,7 +478,6 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
}
func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error {
job := schema.Job{
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
@@ -510,8 +506,8 @@ func (fsa *FsArchive) GetClusters() []string {
func (fsa *FsArchive) ImportJob(
jobMeta *schema.JobMeta,
jobData *schema.JobData) error {
jobData *schema.JobData,
) error {
job := schema.Job{
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),