diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index beeb24d..28768a3 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -5,9 +5,16 @@ package archive import ( + "bufio" + "bytes" + "compress/gzip" "encoding/json" "fmt" + "io" + "path/filepath" + "strconv" + "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -52,9 +59,69 @@ type JobContainer struct { Data *schema.JobData } -var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) -var ar ArchiveBackend -var useArchive bool +var ( + cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) + ar ArchiveBackend + useArchive bool +) + +func getPath( + job *schema.Job, + rootPath string, + file string, +) string { + return filepath.Join( + getDirectory(job, rootPath), file) +} + +func getDirectory( + job *schema.Job, + rootPath string, +) string { + lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) + + return filepath.Join( + rootPath, + job.Cluster, + lvl1, lvl2, + strconv.FormatInt(job.StartTime.Unix(), 10)) +} + +func loadJobMeta(b []byte) (*schema.JobMeta, error) { + if config.Keys.Validate { + if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil { + return &schema.JobMeta{}, fmt.Errorf("validate job meta: %v", err) + } + } + + return DecodeJobMeta(bytes.NewReader(b)) +} + +func loadJobData(f io.Reader, key string, isCompressed bool) (schema.JobData, error) { + if isCompressed { + r, err := gzip.NewReader(f) + if err != nil { + log.Errorf(" %v", err) + return nil, err + } + defer r.Close() + + if config.Keys.Validate { + if err := schema.Validate(schema.Data, r); err != nil { + return schema.JobData{}, fmt.Errorf("validate job data: %v", err) + } + } + + return DecodeJobData(r, key) + } else { + if config.Keys.Validate { + if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil { + return schema.JobData{}, fmt.Errorf("validate job data: %v", err) + } + } + return DecodeJobData(bufio.NewReader(f), key) + } +} func Init(rawConfig json.RawMessage, disableArchive bool) error { useArchive = !disableArchive @@ -95,8 +162,8 @@ func GetHandle() ArchiveBackend { func LoadAveragesFromArchive( job *schema.Job, metrics []string, - data [][]schema.Float) error { - + data [][]schema.Float, +) error { metaFile, err := ar.LoadJobMeta(job) if err != nil { log.Warn("Error while loading job metadata from archiveBackend") @@ -115,7 +182,6 @@ func LoadAveragesFromArchive( } func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { - metaFile, err := ar.LoadJobMeta(job) if err != nil { log.Warn("Error while loading job metadata from archiveBackend") @@ -128,7 +194,6 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { // If the job is archived, find its `meta.json` file and override the tags list // in that JSON file. If the job is not archived, nothing is done. func UpdateTags(job *schema.Job, tags []*schema.Tag) error { - if job.State == schema.JobStateRunning || !useArchive { return nil } diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 71411e6..a8045d6 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -5,9 +5,7 @@ package archive import ( - "bufio" "bytes" - "compress/gzip" "encoding/json" "errors" "fmt" @@ -20,7 +18,6 @@ import ( "text/tabwriter" "time" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -43,71 +40,6 @@ type clusterInfo struct { diskSize float64 } -func getDirectory( - job *schema.Job, - rootPath string, -) string { - lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) - - return filepath.Join( - rootPath, - job.Cluster, - lvl1, lvl2, - strconv.FormatInt(job.StartTime.Unix(), 10)) -} - -func getPath( - job *schema.Job, - rootPath string, - file string, -) string { - return filepath.Join( - getDirectory(job, rootPath), file) -} - -func loadJobMeta(b []byte) (*schema.JobMeta, error) { - if config.Keys.Validate { - if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil { - return &schema.JobMeta{}, fmt.Errorf("validate job meta: %v", err) - } - } - - return DecodeJobMeta(bytes.NewReader(b)) -} - -func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { - f, err := os.Open(filename) - if err != nil { - log.Errorf("fsBackend LoadJobData()- %v", err) - return nil, err - } - defer f.Close() - - if isCompressed { - r, err := gzip.NewReader(f) - if err != nil { - log.Errorf(" %v", err) - return nil, err - } - defer r.Close() - - if config.Keys.Validate { - if err := schema.Validate(schema.Data, r); err != nil { - return schema.JobData{}, fmt.Errorf("validate job data: %v", err) - } - } - - return DecodeJobData(r, filename) - } else { - if config.Keys.Validate { - if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil { - return schema.JobData{}, fmt.Errorf("validate job data: %v", err) - } - } - return DecodeJobData(bufio.NewReader(f), filename) - } -} - func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { var config FsArchiveConfig if err := json.Unmarshal(rawConfig, &config); err != nil { @@ -368,7 +300,7 @@ func (fsa *FsArchive) CompressLast(starttime int64) int64 { } func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { - var isCompressed bool = true + isCompressed := true filename := getPath(job, fsa.path, "data.json.gz") if !util.CheckFileExists(filename) { @@ -376,7 +308,13 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { isCompressed = false } - return loadJobData(filename, isCompressed) + f, err := os.Open(filename) + if err != nil { + log.Errorf("fsBackend LoadJobData()- %v", err) + return nil, err + } + defer f.Close() + return loadJobData(f, filename, isCompressed) } func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { @@ -393,13 +331,11 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json")) if err != nil { log.Errorf("LoadClusterCfg() > open file error: %v", err) - // if config.Keys.Validate { if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil { log.Warnf("Validate cluster config: %v\n", err) return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) } } - // } return DecodeCluster(bytes.NewReader(b)) } @@ -458,7 +394,13 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { isCompressed = false } - data, err := loadJobData(filename, isCompressed) + f, err := os.Open(filename) + if err != nil { + log.Errorf("fsBackend LoadJobData()- %v", err) + } + defer f.Close() + + data, err := loadJobData(f, filename, isCompressed) if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) } diff --git a/pkg/archive/s3Backend.go b/pkg/archive/s3Backend.go index 512cd1c..fe5a479 100644 --- a/pkg/archive/s3Backend.go +++ b/pkg/archive/s3Backend.go @@ -9,14 +9,17 @@ import ( "encoding/json" "fmt" "io" + "path/filepath" "strconv" "strings" "time" + "unsafe" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/pkg/errors" ) type S3ArchiveConfig struct { @@ -33,6 +36,30 @@ type S3Archive struct { clusters []string } +func (s3a *S3Archive) stat(object string) (*minio.ObjectInfo, error) { + objectStat, e := s3a.client.StatObject(context.Background(), + s3a.bucket, + object, minio.GetObjectOptions{}) + + if e != nil { + errResponse := minio.ToErrorResponse(e) + if errResponse.Code == "AccessDenied" { + return nil, errors.Wrap(e, "AccessDenied") + } + if errResponse.Code == "NoSuchBucket" { + return nil, errors.Wrap(e, "NoSuchBucket") + } + if errResponse.Code == "InvalidBucketName" { + return nil, errors.Wrap(e, "InvalidBucketName") + } + if errResponse.Code == "NoSuchKey" { + return nil, errors.Wrap(e, "NoSuchKey") + } + return nil, e + } + return &objectStat, nil +} + func (s3a *S3Archive) Init(rawConfig json.RawMessage) (uint64, error) { var config S3ArchiveConfig var err error @@ -152,6 +179,8 @@ func (s3a *S3Archive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { err = fmt.Errorf("Init() : Get version object failed") return nil, err } + defer r.Close() + b, err := io.ReadAll(r) if err != nil { log.Errorf("Init() : %v", err) @@ -162,23 +191,119 @@ func (s3a *S3Archive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { } func (s3a *S3Archive) LoadJobData(job *schema.Job) (schema.JobData, error) { - var err error - return schema.JobData{}, err + isCompressed := true + key := getPath(job, "./", "data.json.gz") + + _, err := s3a.stat(key) + if err != nil { + if err.Error() == "NoSuchKey" { + key = getPath(job, "./", "data.json") + isCompressed = false + } + } + + r, err := s3a.client.GetObject(context.Background(), + s3a.bucket, key, minio.GetObjectOptions{}) + if err != nil { + err = fmt.Errorf("Init() : Get version object failed") + return nil, err + } + defer r.Close() + + return loadJobData(r, key, isCompressed) } -// func (s3a *S3Archive) LoadClusterCfg(name string) (*schema.Cluster, error) { -// var err error -// return &schema.Cluster{}, err -// } -// -// func (s3a *S3Archive) StoreJobMeta(jobMeta *schema.JobMeta) error -func (s3a *S3Archive) ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobData) error { - var err error - return err +func (s3a *S3Archive) LoadClusterCfg(name string) (*schema.Cluster, error) { + key := filepath.Join("./", name, "cluster.json") + + r, err := s3a.client.GetObject(context.Background(), + s3a.bucket, key, minio.GetObjectOptions{}) + if err != nil { + err = fmt.Errorf("Init() : Get version object failed") + return nil, err + } + defer r.Close() + + return DecodeCluster(r) +} + +func (s3a *S3Archive) ImportJob( + jobMeta *schema.JobMeta, + jobData *schema.JobData, +) error { + job := schema.Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + StartTimeUnix: jobMeta.StartTime, + } + + r, w := io.Pipe() + + if err := EncodeJobMeta(w, jobMeta); err != nil { + log.Error("Error while encoding job metadata to meta.json file") + return err + } + + key := getPath(&job, "./", "meta.json") + s3a.client.PutObject(context.Background(), + s3a.bucket, key, r, + int64(unsafe.Sizeof(job)), minio.PutObjectOptions{}) + + if err := w.Close(); err != nil { + log.Warn("Error while closing meta.json file") + return err + } + + // + // f, err = os.Create(path.Join(dir, "data.json")) + // if err != nil { + // log.Error("Error while creating filepath for data.json") + // return err + // } + // if err := EncodeJobData(f, jobData); err != nil { + // log.Error("Error while encoding job metricdata to data.json file") + // return err + // } + // if err := f.Close(); err != nil { + // log.Warn("Error while closing data.json file") + // } + // return err + // + + return nil +} + +func (s3a *S3Archive) StoreJobMeta(jobMeta *schema.JobMeta) error { + job := schema.Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + StartTimeUnix: jobMeta.StartTime, + } + + r, w := io.Pipe() + + if err := EncodeJobMeta(w, jobMeta); err != nil { + log.Error("Error while encoding job metadata to meta.json file") + return err + } + + key := getPath(&job, "./", "meta.json") + s3a.client.PutObject(context.Background(), + s3a.bucket, key, r, + int64(unsafe.Sizeof(job)), minio.PutObjectOptions{}) + + if err := w.Close(); err != nil { + log.Warn("Error while closing meta.json file") + return err + } + + return nil +} + +func (s3a *S3Archive) GetClusters() []string { + return s3a.clusters } -// -// func (s3a *S3Archive) GetClusters() []string // // func (s3a *S3Archive) CleanUp(jobs []*schema.Job) // diff --git a/pkg/archive/s3Backend_test.go b/pkg/archive/s3Backend_test.go index 4fdf03d..9108c5e 100644 --- a/pkg/archive/s3Backend_test.go +++ b/pkg/archive/s3Backend_test.go @@ -57,3 +57,46 @@ func TestS3LoadJobMeta(t *testing.T) { t.Fail() } } + +func TestS3LoadJobData(t *testing.T) { + var s3a S3Archive + _, err := s3a.Init(json.RawMessage("{\"endpoint\":\"192.168.1.10:9100\",\"accessKeyID\":\"uACSaCN2Chiotpnr4bBS\",\"secretAccessKey\":\"MkEbBsFvMii1K5GreUriTJZxH359B1n28Au9Kaml\",\"bucket\":\"cc-archive\",\"useSSL\":false}")) + if err != nil { + t.Fatal(err) + } + + jobIn := schema.Job{BaseJob: schema.JobDefaults} + jobIn.StartTime = time.Unix(1675954353, 0) + jobIn.JobID = 398764 + jobIn.Cluster = "fritz" + + data, err := s3a.LoadJobData(&jobIn) + if err != nil { + t.Fatal(err) + } + + for _, scopes := range data { + // fmt.Printf("Metric name: %s\n", name) + + if _, exists := scopes[schema.MetricScopeNode]; !exists { + t.Fail() + } + } +} + +func TestS3LoadCluster(t *testing.T) { + var s3a S3Archive + _, err := s3a.Init(json.RawMessage("{\"endpoint\":\"192.168.1.10:9100\",\"accessKeyID\":\"uACSaCN2Chiotpnr4bBS\",\"secretAccessKey\":\"MkEbBsFvMii1K5GreUriTJZxH359B1n28Au9Kaml\",\"bucket\":\"cc-archive\",\"useSSL\":false}")) + if err != nil { + t.Fatal(err) + } + + cfg, err := s3a.LoadClusterCfg("fritz") + if err != nil { + t.Fatal(err) + } + + if cfg.SubClusters[0].CoresPerSocket != 36 { + t.Fail() + } +}