From 10aa2bfbd3f0da15b1ff85f6f35567dfac7b1abc Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 15 Dec 2025 11:24:12 +0100 Subject: [PATCH] Add support for ClusterConfig --- pkg/archive/archive.go | 5 ++- pkg/archive/fsBackend.go | 67 ++++++++++++++++++++++++++++++++++++++-- pkg/archive/json.go | 8 +++++ pkg/archive/s3Backend.go | 63 +++++++++++++++++++++++++++++++++++-- 4 files changed, 137 insertions(+), 6 deletions(-) diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index ca6373f..71933f2 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -132,6 +132,10 @@ type ArchiveBackend interface { // Overwrites existing metadata for the same job ID, cluster, and start time. StoreJobMeta(jobMeta *schema.Job) error + // StoreClusterCfg stores the cluster configuration to the archive. + // Overwrites an existing configuration for the same cluster. + StoreClusterCfg(name string, config *schema.Cluster) error + // ImportJob stores both job metadata and performance data to the archive. // This is typically used during initial job archiving. ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error @@ -277,7 +281,6 @@ func InitBackend(rawConfig json.RawMessage) (ArchiveBackend, error) { return backend, nil } - // LoadAveragesFromArchive loads average metric values for a job from the archive. // This is a helper function that extracts average values from job statistics. // diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 63b1708..1e9d7db 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -168,6 +168,33 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { b, err := os.ReadFile(filepath.Join(fsa.path, "version.txt")) if err != nil { + if errors.Is(err, os.ErrNotExist) { + // Check if directory is empty (ignoring hidden files/dirs) + entries, err := os.ReadDir(fsa.path) + if err != nil { + cclog.Errorf("fsBackend Init() > ReadDir() error: %v", err) + return 0, err + } + + isEmpty := true + for _, e := range entries { + if e.Name()[0] != '.' { + isEmpty = false + break + } + } + + if isEmpty { + cclog.Infof("fsBackend Init() > Bootstrapping new archive at %s", fsa.path) + versionStr := fmt.Sprintf("%d\n", Version) + if err := os.WriteFile(filepath.Join(fsa.path, "version.txt"), []byte(versionStr), 0644); err != nil { + cclog.Errorf("fsBackend Init() > failed to create version.txt: %v", err) + return 0, err + } + return Version, nil + } + } + cclog.Warnf("fsBackend Init() - %v", err) return 0, err } @@ -449,13 +476,15 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json")) if err != nil { cclog.Errorf("LoadClusterCfg() > open file error: %v", err) - // if config.Keys.Validate { + return &schema.Cluster{}, err + } + + if config.Keys.Validate { if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil { cclog.Warnf("Validate cluster config: %v\n", err) return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) } } - // } return DecodeCluster(bytes.NewReader(b)) } @@ -588,3 +617,37 @@ func (fsa *FsArchive) ImportJob( } return err } + +func (fsa *FsArchive) StoreClusterCfg(name string, config *schema.Cluster) error { + dir := filepath.Join(fsa.path, name) + if err := os.MkdirAll(dir, 0777); err != nil { + cclog.Errorf("StoreClusterCfg() > mkdir error: %v", err) + return err + } + + f, err := os.Create(filepath.Join(dir, "cluster.json")) + if err != nil { + cclog.Errorf("StoreClusterCfg() > create file error: %v", err) + return err + } + defer f.Close() + + if err := EncodeCluster(f, config); err != nil { + cclog.Errorf("StoreClusterCfg() > encode error: %v", err) + return err + } + + // Update clusters list if new + found := false + for _, c := range fsa.clusters { + if c == name { + found = true + break + } + } + if !found { + fsa.clusters = append(fsa.clusters, name) + } + + return nil +} diff --git a/pkg/archive/json.go b/pkg/archive/json.go index a8f4cae..75c3953 100644 --- a/pkg/archive/json.go +++ b/pkg/archive/json.go @@ -113,3 +113,11 @@ func EncodeJobMeta(w io.Writer, d *schema.Job) error { return nil } + +func EncodeCluster(w io.Writer, c *schema.Cluster) error { + if err := json.NewEncoder(w).Encode(c); err != nil { + cclog.Warn("Error while encoding cluster json") + return err + } + return nil +} diff --git a/pkg/archive/s3Backend.go b/pkg/archive/s3Backend.go index c974899..5b3d9f0 100644 --- a/pkg/archive/s3Backend.go +++ b/pkg/archive/s3Backend.go @@ -10,6 +10,7 @@ import ( "compress/gzip" "context" "encoding/json" + "errors" "fmt" "io" "math" @@ -27,6 +28,7 @@ import ( awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" ) // S3ArchiveConfig holds the configuration for the S3 archive backend. @@ -135,6 +137,24 @@ func (s3a *S3Archive) Init(rawConfig json.RawMessage) (uint64, error) { Key: aws.String(versionKey), }) if err != nil { + // If version.txt is missing, try to bootstrap (assuming new archive) + var noKey *types.NoSuchKey + // Check for different error types that indicate missing key + if errors.As(err, &noKey) || strings.Contains(err.Error(), "NoSuchKey") || strings.Contains(err.Error(), "404") { + cclog.Infof("S3Archive Init() > Bootstrapping new archive at bucket %s", s3a.bucket) + versionStr := fmt.Sprintf("%d\n", Version) + _, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s3a.bucket), + Key: aws.String(versionKey), + Body: strings.NewReader(versionStr), + }) + if err != nil { + cclog.Errorf("S3Archive Init() > failed to create version.txt: %v", err) + return 0, err + } + return Version, nil + } + cclog.Warnf("S3Archive Init() > cannot read version.txt: %v", err) return 0, err } @@ -411,9 +431,11 @@ func (s3a *S3Archive) LoadClusterCfg(name string) (*schema.Cluster, error) { return nil, err } - if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil { - cclog.Warnf("Validate cluster config: %v\n", err) - return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) + if config.Keys.Validate { + if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil { + cclog.Warnf("Validate cluster config: %v\n", err) + return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) + } } return DecodeCluster(bytes.NewReader(b)) @@ -833,3 +855,38 @@ func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer { return ch } + +func (s3a *S3Archive) StoreClusterCfg(name string, config *schema.Cluster) error { + ctx := context.Background() + key := fmt.Sprintf("%s/cluster.json", name) + + var buf bytes.Buffer + if err := EncodeCluster(&buf, config); err != nil { + cclog.Error("S3Archive StoreClusterCfg() > encoding error") + return err + } + + _, err := s3a.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s3a.bucket), + Key: aws.String(key), + Body: bytes.NewReader(buf.Bytes()), + }) + if err != nil { + cclog.Errorf("S3Archive StoreClusterCfg() > PutObject error: %v", err) + return err + } + + // Update clusters list if new + found := false + for _, c := range s3a.clusters { + if c == name { + found = true + break + } + } + if !found { + s3a.clusters = append(s3a.clusters, name) + } + + return nil +}