mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-12-16 12:16:16 +01:00
Add support for ClusterConfig
This commit is contained in:
@@ -132,6 +132,10 @@ type ArchiveBackend interface {
|
|||||||
// Overwrites existing metadata for the same job ID, cluster, and start time.
|
// Overwrites existing metadata for the same job ID, cluster, and start time.
|
||||||
StoreJobMeta(jobMeta *schema.Job) error
|
StoreJobMeta(jobMeta *schema.Job) error
|
||||||
|
|
||||||
|
// StoreClusterCfg stores the cluster configuration to the archive.
|
||||||
|
// Overwrites an existing configuration for the same cluster.
|
||||||
|
StoreClusterCfg(name string, config *schema.Cluster) error
|
||||||
|
|
||||||
// ImportJob stores both job metadata and performance data to the archive.
|
// ImportJob stores both job metadata and performance data to the archive.
|
||||||
// This is typically used during initial job archiving.
|
// This is typically used during initial job archiving.
|
||||||
ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error
|
ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error
|
||||||
@@ -277,7 +281,6 @@ func InitBackend(rawConfig json.RawMessage) (ArchiveBackend, error) {
|
|||||||
return backend, nil
|
return backend, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// LoadAveragesFromArchive loads average metric values for a job from the archive.
|
// LoadAveragesFromArchive loads average metric values for a job from the archive.
|
||||||
// This is a helper function that extracts average values from job statistics.
|
// This is a helper function that extracts average values from job statistics.
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -168,6 +168,33 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
|
|||||||
|
|
||||||
b, err := os.ReadFile(filepath.Join(fsa.path, "version.txt"))
|
b, err := os.ReadFile(filepath.Join(fsa.path, "version.txt"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, os.ErrNotExist) {
|
||||||
|
// Check if directory is empty (ignoring hidden files/dirs)
|
||||||
|
entries, err := os.ReadDir(fsa.path)
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("fsBackend Init() > ReadDir() error: %v", err)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
isEmpty := true
|
||||||
|
for _, e := range entries {
|
||||||
|
if e.Name()[0] != '.' {
|
||||||
|
isEmpty = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if isEmpty {
|
||||||
|
cclog.Infof("fsBackend Init() > Bootstrapping new archive at %s", fsa.path)
|
||||||
|
versionStr := fmt.Sprintf("%d\n", Version)
|
||||||
|
if err := os.WriteFile(filepath.Join(fsa.path, "version.txt"), []byte(versionStr), 0644); err != nil {
|
||||||
|
cclog.Errorf("fsBackend Init() > failed to create version.txt: %v", err)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return Version, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cclog.Warnf("fsBackend Init() - %v", err)
|
cclog.Warnf("fsBackend Init() - %v", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@@ -449,13 +476,15 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
|
|||||||
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
|
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("LoadClusterCfg() > open file error: %v", err)
|
cclog.Errorf("LoadClusterCfg() > open file error: %v", err)
|
||||||
// if config.Keys.Validate {
|
return &schema.Cluster{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Keys.Validate {
|
||||||
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
|
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
|
||||||
cclog.Warnf("Validate cluster config: %v\n", err)
|
cclog.Warnf("Validate cluster config: %v\n", err)
|
||||||
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
|
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// }
|
|
||||||
return DecodeCluster(bytes.NewReader(b))
|
return DecodeCluster(bytes.NewReader(b))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -588,3 +617,37 @@ func (fsa *FsArchive) ImportJob(
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) StoreClusterCfg(name string, config *schema.Cluster) error {
|
||||||
|
dir := filepath.Join(fsa.path, name)
|
||||||
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
|
cclog.Errorf("StoreClusterCfg() > mkdir error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.Create(filepath.Join(dir, "cluster.json"))
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("StoreClusterCfg() > create file error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
if err := EncodeCluster(f, config); err != nil {
|
||||||
|
cclog.Errorf("StoreClusterCfg() > encode error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update clusters list if new
|
||||||
|
found := false
|
||||||
|
for _, c := range fsa.clusters {
|
||||||
|
if c == name {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
fsa.clusters = append(fsa.clusters, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -113,3 +113,11 @@ func EncodeJobMeta(w io.Writer, d *schema.Job) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func EncodeCluster(w io.Writer, c *schema.Cluster) error {
|
||||||
|
if err := json.NewEncoder(w).Encode(c); err != nil {
|
||||||
|
cclog.Warn("Error while encoding cluster json")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
@@ -27,6 +28,7 @@ import (
|
|||||||
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
||||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// S3ArchiveConfig holds the configuration for the S3 archive backend.
|
// S3ArchiveConfig holds the configuration for the S3 archive backend.
|
||||||
@@ -135,6 +137,24 @@ func (s3a *S3Archive) Init(rawConfig json.RawMessage) (uint64, error) {
|
|||||||
Key: aws.String(versionKey),
|
Key: aws.String(versionKey),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// If version.txt is missing, try to bootstrap (assuming new archive)
|
||||||
|
var noKey *types.NoSuchKey
|
||||||
|
// Check for different error types that indicate missing key
|
||||||
|
if errors.As(err, &noKey) || strings.Contains(err.Error(), "NoSuchKey") || strings.Contains(err.Error(), "404") {
|
||||||
|
cclog.Infof("S3Archive Init() > Bootstrapping new archive at bucket %s", s3a.bucket)
|
||||||
|
versionStr := fmt.Sprintf("%d\n", Version)
|
||||||
|
_, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{
|
||||||
|
Bucket: aws.String(s3a.bucket),
|
||||||
|
Key: aws.String(versionKey),
|
||||||
|
Body: strings.NewReader(versionStr),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("S3Archive Init() > failed to create version.txt: %v", err)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return Version, nil
|
||||||
|
}
|
||||||
|
|
||||||
cclog.Warnf("S3Archive Init() > cannot read version.txt: %v", err)
|
cclog.Warnf("S3Archive Init() > cannot read version.txt: %v", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@@ -411,10 +431,12 @@ func (s3a *S3Archive) LoadClusterCfg(name string) (*schema.Cluster, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.Keys.Validate {
|
||||||
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
|
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
|
||||||
cclog.Warnf("Validate cluster config: %v\n", err)
|
cclog.Warnf("Validate cluster config: %v\n", err)
|
||||||
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
|
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return DecodeCluster(bytes.NewReader(b))
|
return DecodeCluster(bytes.NewReader(b))
|
||||||
}
|
}
|
||||||
@@ -833,3 +855,38 @@ func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer {
|
|||||||
|
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s3a *S3Archive) StoreClusterCfg(name string, config *schema.Cluster) error {
|
||||||
|
ctx := context.Background()
|
||||||
|
key := fmt.Sprintf("%s/cluster.json", name)
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
if err := EncodeCluster(&buf, config); err != nil {
|
||||||
|
cclog.Error("S3Archive StoreClusterCfg() > encoding error")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := s3a.client.PutObject(ctx, &s3.PutObjectInput{
|
||||||
|
Bucket: aws.String(s3a.bucket),
|
||||||
|
Key: aws.String(key),
|
||||||
|
Body: bytes.NewReader(buf.Bytes()),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("S3Archive StoreClusterCfg() > PutObject error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update clusters list if new
|
||||||
|
found := false
|
||||||
|
for _, c := range s3a.clusters {
|
||||||
|
if c == name {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
s3a.clusters = append(s3a.clusters, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user