// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. package archive import ( "bytes" "compress/gzip" "context" "encoding/json" "fmt" "io" "math" "os" "strconv" "strings" "text/tabwriter" "time" "github.com/ClusterCockpit/cc-backend/internal/config" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" "github.com/ClusterCockpit/cc-lib/util" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" ) // S3ArchiveConfig holds the configuration for the S3 archive backend. type S3ArchiveConfig struct { Endpoint string `json:"endpoint"` // S3 endpoint URL (optional, for MinIO/localstack) AccessKey string `json:"accessKey"` // AWS access key ID SecretKey string `json:"secretKey"` // AWS secret access key Bucket string `json:"bucket"` // S3 bucket name Region string `json:"region"` // AWS region UsePathStyle bool `json:"usePathStyle"` // Use path-style URLs (required for MinIO) } // S3Archive implements ArchiveBackend using AWS S3 or S3-compatible object storage. // Jobs are stored as objects with keys mirroring the filesystem structure. // // Object key structure: ////meta.json type S3Archive struct { client *s3.Client // AWS S3 client bucket string // S3 bucket name clusters []string // List of discovered cluster names } // getS3Key generates the S3 object key for a job file func getS3Key(job *schema.Job, file string) string { lvl1 := fmt.Sprintf("%d", job.JobID/1000) lvl2 := fmt.Sprintf("%03d", job.JobID%1000) startTime := strconv.FormatInt(job.StartTime, 10) return fmt.Sprintf("%s/%s/%s/%s/%s", job.Cluster, lvl1, lvl2, startTime, file) } // getS3Directory generates the S3 key prefix for a job directory func getS3Directory(job *schema.Job) string { lvl1 := fmt.Sprintf("%d", job.JobID/1000) lvl2 := fmt.Sprintf("%03d", job.JobID%1000) startTime := strconv.FormatInt(job.StartTime, 10) return fmt.Sprintf("%s/%s/%s/%s/", job.Cluster, lvl1, lvl2, startTime) } func (s3a *S3Archive) Init(rawConfig json.RawMessage) (uint64, error) { var cfg S3ArchiveConfig if err := json.Unmarshal(rawConfig, &cfg); err != nil { cclog.Warnf("S3Archive Init() > Unmarshal error: %#v", err) return 0, err } if cfg.Bucket == "" { err := fmt.Errorf("S3Archive Init(): empty bucket name") cclog.Errorf("S3Archive Init() > config error: %v", err) return 0, err } if cfg.Region == "" { cfg.Region = "us-east-1" // Default region } ctx := context.Background() // Create custom AWS config var awsCfg aws.Config var err error if cfg.AccessKey != "" && cfg.SecretKey != "" { // Use static credentials customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { if cfg.Endpoint != "" { return aws.Endpoint{ URL: cfg.Endpoint, HostnameImmutable: cfg.UsePathStyle, Source: aws.EndpointSourceCustom, }, nil } return aws.Endpoint{}, &aws.EndpointNotFoundError{} }) awsCfg, err = awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(cfg.Region), awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( cfg.AccessKey, cfg.SecretKey, "", )), awsconfig.WithEndpointResolverWithOptions(customResolver), ) } else { // Use default credential chain awsCfg, err = awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(cfg.Region), ) } if err != nil { cclog.Errorf("S3Archive Init() > failed to load AWS config: %v", err) return 0, err } // Create S3 client with path-style option s3a.client = s3.NewFromConfig(awsCfg, func(o *s3.Options) { o.UsePathStyle = cfg.UsePathStyle }) s3a.bucket = cfg.Bucket // Check if bucket exists and is accessible _, err = s3a.client.HeadBucket(ctx, &s3.HeadBucketInput{ Bucket: aws.String(s3a.bucket), }) if err != nil { cclog.Errorf("S3Archive Init() > bucket access error: %v", err) return 0, fmt.Errorf("cannot access S3 bucket '%s': %w", s3a.bucket, err) } // Read version.txt from S3 versionKey := "version.txt" result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(versionKey), }) if err != nil { cclog.Warnf("S3Archive Init() > cannot read version.txt: %v", err) return 0, err } defer result.Body.Close() versionBytes, err := io.ReadAll(result.Body) if err != nil { cclog.Errorf("S3Archive Init() > failed to read version.txt: %v", err) return 0, err } version, err := strconv.ParseUint(strings.TrimSuffix(string(versionBytes), "\n"), 10, 64) if err != nil { cclog.Errorf("S3Archive Init() > version parse error: %v", err) return 0, err } if version != Version { return version, fmt.Errorf("unsupported version %d, need %d", version, Version) } // Discover clusters by listing top-level prefixes s3a.clusters = []string{} paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{ Bucket: aws.String(s3a.bucket), Delimiter: aws.String("/"), }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { cclog.Errorf("S3Archive Init() > failed to list clusters: %v", err) return 0, err } for _, prefix := range page.CommonPrefixes { if prefix.Prefix != nil { clusterName := strings.TrimSuffix(*prefix.Prefix, "/") // Filter out non-cluster entries if clusterName != "" && clusterName != "version.txt" { s3a.clusters = append(s3a.clusters, clusterName) } } } } cclog.Infof("S3Archive initialized with bucket '%s', found %d clusters", s3a.bucket, len(s3a.clusters)) return version, nil } func (s3a *S3Archive) Info() { ctx := context.Background() fmt.Printf("S3 Job archive bucket: %s\n", s3a.bucket) ci := make(map[string]*clusterInfo) for _, cluster := range s3a.clusters { ci[cluster] = &clusterInfo{dateFirst: time.Now().Unix()} // List all jobs for this cluster prefix := cluster + "/" paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{ Bucket: aws.String(s3a.bucket), Prefix: aws.String(prefix), }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { cclog.Fatalf("S3Archive Info() > failed to list objects: %s", err.Error()) } for _, obj := range page.Contents { if obj.Key != nil && strings.HasSuffix(*obj.Key, "/meta.json") { ci[cluster].numJobs++ // Extract starttime from key: cluster/lvl1/lvl2/starttime/meta.json parts := strings.Split(*obj.Key, "/") if len(parts) >= 4 { startTime, err := strconv.ParseInt(parts[3], 10, 64) if err == nil { ci[cluster].dateFirst = util.Min(ci[cluster].dateFirst, startTime) ci[cluster].dateLast = util.Max(ci[cluster].dateLast, startTime) } } if obj.Size != nil { ci[cluster].diskSize += float64(*obj.Size) / (1024 * 1024) // Convert to MB } } } } } cit := clusterInfo{dateFirst: time.Now().Unix()} w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', tabwriter.Debug) fmt.Fprintln(w, "cluster\t#jobs\tfrom\tto\tsize (MB)") for cluster, clusterInfo := range ci { fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%.2f\n", cluster, clusterInfo.numJobs, time.Unix(clusterInfo.dateFirst, 0), time.Unix(clusterInfo.dateLast, 0), clusterInfo.diskSize) cit.numJobs += clusterInfo.numJobs cit.dateFirst = util.Min(cit.dateFirst, clusterInfo.dateFirst) cit.dateLast = util.Max(cit.dateLast, clusterInfo.dateLast) cit.diskSize += clusterInfo.diskSize } fmt.Fprintf(w, "TOTAL\t%d\t%s\t%s\t%.2f\n", cit.numJobs, time.Unix(cit.dateFirst, 0), time.Unix(cit.dateLast, 0), cit.diskSize) w.Flush() } func (s3a *S3Archive) Exists(job *schema.Job) bool { ctx := context.Background() key := getS3Key(job, "meta.json") _, err := s3a.client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(key), }) return err == nil } func (s3a *S3Archive) LoadJobMeta(job *schema.Job) (*schema.Job, error) { ctx := context.Background() key := getS3Key(job, "meta.json") result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(key), }) if err != nil { cclog.Errorf("S3Archive LoadJobMeta() > GetObject error: %v", err) return nil, err } defer result.Body.Close() b, err := io.ReadAll(result.Body) if err != nil { cclog.Errorf("S3Archive LoadJobMeta() > read error: %v", err) return nil, err } if config.Keys.Validate { if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil { return nil, fmt.Errorf("validate job meta: %v", err) } } return DecodeJobMeta(bytes.NewReader(b)) } func (s3a *S3Archive) LoadJobData(job *schema.Job) (schema.JobData, error) { ctx := context.Background() // Try compressed file first keyGz := getS3Key(job, "data.json.gz") result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(keyGz), }) if err != nil { // Try uncompressed file key := getS3Key(job, "data.json") result, err = s3a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(key), }) if err != nil { cclog.Errorf("S3Archive LoadJobData() > GetObject error: %v", err) return nil, err } defer result.Body.Close() if config.Keys.Validate { b, _ := io.ReadAll(result.Body) if err := schema.Validate(schema.Data, bytes.NewReader(b)); err != nil { return schema.JobData{}, fmt.Errorf("validate job data: %v", err) } return DecodeJobData(bytes.NewReader(b), key) } return DecodeJobData(result.Body, key) } defer result.Body.Close() // Decompress r, err := gzip.NewReader(result.Body) if err != nil { cclog.Errorf("S3Archive LoadJobData() > gzip error: %v", err) return nil, err } defer r.Close() if config.Keys.Validate { b, _ := io.ReadAll(r) if err := schema.Validate(schema.Data, bytes.NewReader(b)); err != nil { return schema.JobData{}, fmt.Errorf("validate job data: %v", err) } return DecodeJobData(bytes.NewReader(b), keyGz) } return DecodeJobData(r, keyGz) } func (s3a *S3Archive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error) { ctx := context.Background() // Try compressed file first keyGz := getS3Key(job, "data.json.gz") result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(keyGz), }) if err != nil { // Try uncompressed file key := getS3Key(job, "data.json") result, err = s3a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(key), }) if err != nil { cclog.Errorf("S3Archive LoadJobStats() > GetObject error: %v", err) return nil, err } defer result.Body.Close() if config.Keys.Validate { b, _ := io.ReadAll(result.Body) if err := schema.Validate(schema.Data, bytes.NewReader(b)); err != nil { return nil, fmt.Errorf("validate job data: %v", err) } return DecodeJobStats(bytes.NewReader(b), key) } return DecodeJobStats(result.Body, key) } defer result.Body.Close() // Decompress r, err := gzip.NewReader(result.Body) if err != nil { cclog.Errorf("S3Archive LoadJobStats() > gzip error: %v", err) return nil, err } defer r.Close() if config.Keys.Validate { b, _ := io.ReadAll(r) if err := schema.Validate(schema.Data, bytes.NewReader(b)); err != nil { return nil, fmt.Errorf("validate job data: %v", err) } return DecodeJobStats(bytes.NewReader(b), keyGz) } return DecodeJobStats(r, keyGz) } func (s3a *S3Archive) LoadClusterCfg(name string) (*schema.Cluster, error) { ctx := context.Background() key := fmt.Sprintf("%s/cluster.json", name) result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(key), }) if err != nil { cclog.Errorf("S3Archive LoadClusterCfg() > GetObject error: %v", err) return nil, err } defer result.Body.Close() b, err := io.ReadAll(result.Body) if err != nil { cclog.Errorf("S3Archive LoadClusterCfg() > read error: %v", err) return nil, err } if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil { cclog.Warnf("Validate cluster config: %v\n", err) return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) } return DecodeCluster(bytes.NewReader(b)) } func (s3a *S3Archive) StoreJobMeta(job *schema.Job) error { ctx := context.Background() key := getS3Key(job, "meta.json") var buf bytes.Buffer if err := EncodeJobMeta(&buf, job); err != nil { cclog.Error("S3Archive StoreJobMeta() > encoding error") return err } _, err := s3a.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(key), Body: bytes.NewReader(buf.Bytes()), }) if err != nil { cclog.Errorf("S3Archive StoreJobMeta() > PutObject error: %v", err) return err } return nil } func (s3a *S3Archive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error { ctx := context.Background() // Upload meta.json metaKey := getS3Key(jobMeta, "meta.json") var metaBuf bytes.Buffer if err := EncodeJobMeta(&metaBuf, jobMeta); err != nil { cclog.Error("S3Archive ImportJob() > encoding meta error") return err } _, err := s3a.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(metaKey), Body: bytes.NewReader(metaBuf.Bytes()), }) if err != nil { cclog.Errorf("S3Archive ImportJob() > PutObject meta error: %v", err) return err } // Upload data.json dataKey := getS3Key(jobMeta, "data.json") var dataBuf bytes.Buffer if err := EncodeJobData(&dataBuf, jobData); err != nil { cclog.Error("S3Archive ImportJob() > encoding data error") return err } _, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(dataKey), Body: bytes.NewReader(dataBuf.Bytes()), }) if err != nil { cclog.Errorf("S3Archive ImportJob() > PutObject data error: %v", err) return err } return nil } func (s3a *S3Archive) GetClusters() []string { return s3a.clusters } func (s3a *S3Archive) CleanUp(jobs []*schema.Job) { ctx := context.Background() start := time.Now() for _, job := range jobs { if job == nil { cclog.Errorf("S3Archive CleanUp() error: job is nil") continue } // Delete all files in the job directory prefix := getS3Directory(job) paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{ Bucket: aws.String(s3a.bucket), Prefix: aws.String(prefix), }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { cclog.Errorf("S3Archive CleanUp() > list error: %v", err) continue } for _, obj := range page.Contents { if obj.Key != nil { _, err := s3a.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(s3a.bucket), Key: obj.Key, }) if err != nil { cclog.Errorf("S3Archive CleanUp() > delete error: %v", err) } } } } } cclog.Infof("Retention Service - Remove %d jobs from S3 in %s", len(jobs), time.Since(start)) } func (s3a *S3Archive) Move(jobs []*schema.Job, targetPath string) { ctx := context.Background() for _, job := range jobs { sourcePrefix := getS3Directory(job) // List all objects in source paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{ Bucket: aws.String(s3a.bucket), Prefix: aws.String(sourcePrefix), }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { cclog.Errorf("S3Archive Move() > list error: %v", err) continue } for _, obj := range page.Contents { if obj.Key == nil { continue } // Compute target key by replacing prefix targetKey := strings.Replace(*obj.Key, sourcePrefix, targetPath+"/", 1) // Copy object _, err := s3a.client.CopyObject(ctx, &s3.CopyObjectInput{ Bucket: aws.String(s3a.bucket), CopySource: aws.String(fmt.Sprintf("%s/%s", s3a.bucket, *obj.Key)), Key: aws.String(targetKey), }) if err != nil { cclog.Errorf("S3Archive Move() > copy error: %v", err) continue } // Delete source object _, err = s3a.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(s3a.bucket), Key: obj.Key, }) if err != nil { cclog.Errorf("S3Archive Move() > delete error: %v", err) } } } } } func (s3a *S3Archive) Clean(before int64, after int64) { ctx := context.Background() if after == 0 { after = math.MaxInt64 } for _, cluster := range s3a.clusters { prefix := cluster + "/" paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{ Bucket: aws.String(s3a.bucket), Prefix: aws.String(prefix), }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { cclog.Fatalf("S3Archive Clean() > list error: %s", err.Error()) } for _, obj := range page.Contents { if obj.Key == nil || !strings.HasSuffix(*obj.Key, "/meta.json") { continue } // Extract starttime from key: cluster/lvl1/lvl2/starttime/meta.json parts := strings.Split(*obj.Key, "/") if len(parts) < 4 { continue } startTime, err := strconv.ParseInt(parts[3], 10, 64) if err != nil { cclog.Fatalf("S3Archive Clean() > cannot parse starttime: %s", err.Error()) } if startTime < before || startTime > after { // Delete entire job directory jobPrefix := strings.Join(parts[:4], "/") + "/" jobPaginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{ Bucket: aws.String(s3a.bucket), Prefix: aws.String(jobPrefix), }) for jobPaginator.HasMorePages() { jobPage, err := jobPaginator.NextPage(ctx) if err != nil { cclog.Errorf("S3Archive Clean() > list job error: %v", err) continue } for _, jobObj := range jobPage.Contents { if jobObj.Key != nil { _, err := s3a.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(s3a.bucket), Key: jobObj.Key, }) if err != nil { cclog.Errorf("S3Archive Clean() > delete error: %v", err) } } } } } } } } } func (s3a *S3Archive) Compress(jobs []*schema.Job) { ctx := context.Background() var cnt int start := time.Now() for _, job := range jobs { dataKey := getS3Key(job, "data.json") // Check if uncompressed file exists and get its size headResult, err := s3a.client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(dataKey), }) if err != nil { continue // File doesn't exist or error } if headResult.ContentLength == nil || *headResult.ContentLength < 2000 { continue // Too small to compress } // Download the file result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(dataKey), }) if err != nil { cclog.Errorf("S3Archive Compress() > GetObject error: %v", err) continue } data, err := io.ReadAll(result.Body) result.Body.Close() if err != nil { cclog.Errorf("S3Archive Compress() > read error: %v", err) continue } // Compress the data var compressedBuf bytes.Buffer gzipWriter := gzip.NewWriter(&compressedBuf) if _, err := gzipWriter.Write(data); err != nil { cclog.Errorf("S3Archive Compress() > gzip write error: %v", err) gzipWriter.Close() continue } gzipWriter.Close() // Upload compressed file compressedKey := getS3Key(job, "data.json.gz") _, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(compressedKey), Body: bytes.NewReader(compressedBuf.Bytes()), }) if err != nil { cclog.Errorf("S3Archive Compress() > PutObject error: %v", err) continue } // Delete uncompressed file _, err = s3a.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(dataKey), }) if err != nil { cclog.Errorf("S3Archive Compress() > delete error: %v", err) } cnt++ } cclog.Infof("Compression Service - %d files in S3 took %s", cnt, time.Since(start)) } func (s3a *S3Archive) CompressLast(starttime int64) int64 { ctx := context.Background() compressKey := "compress.txt" // Try to read existing compress.txt result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(compressKey), }) var last int64 if err == nil { b, _ := io.ReadAll(result.Body) result.Body.Close() last, err = strconv.ParseInt(strings.TrimSuffix(string(b), "\n"), 10, 64) if err != nil { cclog.Errorf("S3Archive CompressLast() > parse error: %v", err) last = starttime } } else { last = starttime } cclog.Infof("S3Archive CompressLast() - start %d last %d", starttime, last) // Write new timestamp newValue := fmt.Sprintf("%d", starttime) _, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(compressKey), Body: strings.NewReader(newValue), }) if err != nil { cclog.Errorf("S3Archive CompressLast() > PutObject error: %v", err) } return last } func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer { ch := make(chan JobContainer) go func() { ctx := context.Background() defer close(ch) for _, cluster := range s3a.clusters { prefix := cluster + "/" paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{ Bucket: aws.String(s3a.bucket), Prefix: aws.String(prefix), }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { cclog.Fatalf("S3Archive Iter() > list error: %s", err.Error()) } for _, obj := range page.Contents { if obj.Key == nil || !strings.HasSuffix(*obj.Key, "/meta.json") { continue } // Load job metadata result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s3a.bucket), Key: obj.Key, }) if err != nil { cclog.Errorf("S3Archive Iter() > GetObject meta error: %v", err) continue } b, err := io.ReadAll(result.Body) result.Body.Close() if err != nil { cclog.Errorf("S3Archive Iter() > read meta error: %v", err) continue } job, err := DecodeJobMeta(bytes.NewReader(b)) if err != nil { cclog.Errorf("S3Archive Iter() > decode meta error: %v", err) continue } if loadMetricData { jobData, err := s3a.LoadJobData(job) if err != nil { cclog.Errorf("S3Archive Iter() > load data error: %v", err) ch <- JobContainer{Meta: job, Data: nil} } else { ch <- JobContainer{Meta: job, Data: &jobData} } } else { ch <- JobContainer{Meta: job, Data: nil} } } } } }() return ch }