From c920c57f5d7d88550f161a95fd51b62bd24ff2a5 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sat, 7 Feb 2026 10:51:56 +0100 Subject: [PATCH] Add parquet file job archiving target --- go.mod | 11 +- go.sum | 13 ++ internal/taskmanager/retentionService.go | 94 ++++++++++ internal/taskmanager/taskManager.go | 21 ++- pkg/archive/ConfigSchema.go | 39 +++- pkg/archive/parquet/convert.go | 116 ++++++++++++ pkg/archive/parquet/schema.go | 32 ++++ pkg/archive/parquet/target.go | 100 ++++++++++ pkg/archive/parquet/writer.go | 113 ++++++++++++ pkg/archive/parquet/writer_test.go | 225 +++++++++++++++++++++++ 10 files changed, 755 insertions(+), 9 deletions(-) create mode 100644 pkg/archive/parquet/convert.go create mode 100644 pkg/archive/parquet/schema.go create mode 100644 pkg/archive/parquet/target.go create mode 100644 pkg/archive/parquet/writer.go create mode 100644 pkg/archive/parquet/writer_test.go diff --git a/go.mod b/go.mod index da712da9..af27227a 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/ClusterCockpit/cc-backend -go 1.24.0 - -toolchain go1.24.1 +go 1.24.9 tool ( github.com/99designs/gqlgen @@ -47,6 +45,7 @@ require ( github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect github.com/KyleBanks/depth v1.2.1 // indirect github.com/agnivade/levenshtein v1.2.1 // indirect + github.com/andybalholm/brotli v1.1.1 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 // indirect @@ -98,6 +97,10 @@ require ( github.com/nats-io/nkeys v0.4.12 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/oapi-codegen/runtime v1.1.1 // indirect + github.com/parquet-go/bitpack v1.0.0 // indirect + github.com/parquet-go/jsonlite v1.0.0 // indirect + github.com/parquet-go/parquet-go v0.27.0 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/common v0.67.4 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect @@ -106,6 +109,7 @@ require ( github.com/stmcginnis/gofish v0.20.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/swaggo/files v1.0.1 // indirect + github.com/twpayne/go-geom v1.6.1 // indirect github.com/urfave/cli/v2 v2.27.7 // indirect github.com/urfave/cli/v3 v3.6.1 // indirect github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 // indirect @@ -118,6 +122,7 @@ require ( golang.org/x/sys v0.39.0 // indirect golang.org/x/text v0.32.0 // indirect golang.org/x/tools v0.40.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.6.0 // indirect ) diff --git a/go.sum b/go.sum index 43331fce..a9cb9ddb 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e h1:4dAU9FXIyQktp github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e/go.mod h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM= github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA= github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op h1:Ucf+QxEKMbPogRO5guBNe5cgd9uZgfoJLOYs8WWhtjM= @@ -238,6 +240,14 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro= github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/parquet-go/bitpack v1.0.0 h1:AUqzlKzPPXf2bCdjfj4sTeacrUwsT7NlcYDMUQxPcQA= +github.com/parquet-go/bitpack v1.0.0/go.mod h1:XnVk9TH+O40eOOmvpAVZ7K2ocQFrQwysLMnc6M/8lgs= +github.com/parquet-go/jsonlite v1.0.0 h1:87QNdi56wOfsE5bdgas0vRzHPxfJgzrXGml1zZdd7VU= +github.com/parquet-go/jsonlite v1.0.0/go.mod h1:nDjpkpL4EOtqs6NQugUsi0Rleq9sW/OtC1NnZEnxzF0= +github.com/parquet-go/parquet-go v0.27.0 h1:vHWK2xaHbj+v1DYps03yDRpEsdtOeKbhiXUaixoPb3g= +github.com/parquet-go/parquet-go v0.27.0/go.mod h1:navtkAYr2LGoJVp141oXPlO/sxLvaOe3la2JEoD8+rg= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= @@ -285,6 +295,8 @@ github.com/swaggo/http-swagger v1.3.4 h1:q7t/XLx0n15H1Q9/tk3Y9L4n210XzJF5WtnDX64 github.com/swaggo/http-swagger v1.3.4/go.mod h1:9dAh0unqMBAlbp1uE2Uc2mQTxNMU/ha4UbucIg1MFkQ= github.com/swaggo/swag v1.16.6 h1:qBNcx53ZaX+M5dxVyTrgQ0PJ/ACK+NzhwcbieTt+9yI= github.com/swaggo/swag v1.16.6/go.mod h1:ngP2etMK5a0P3QBizic5MEwpRmluJZPHjXcMoj4Xesg= +github.com/twpayne/go-geom v1.6.1 h1:iLE+Opv0Ihm/ABIcvQFGIiFBXd76oBIar9drAwHFhR4= +github.com/twpayne/go-geom v1.6.1/go.mod h1:Kr+Nly6BswFsKM5sd31YaoWS5PeDDH2NftJTK7Gd028= github.com/urfave/cli/v2 v2.27.7 h1:bH59vdhbjLv3LAvIu6gd0usJHgoTTPhCFib8qqOwXYU= github.com/urfave/cli/v2 v2.27.7/go.mod h1:CyNAG/xg+iAOg0N4MPGZqVmv2rCoP267496AOXUZjA4= github.com/urfave/cli/v3 v3.6.1 h1:j8Qq8NyUawj/7rTYdBGrxcH7A/j7/G8Q5LhWEW4G3Mo= @@ -293,6 +305,7 @@ github.com/vektah/gqlparser/v2 v2.5.31 h1:YhWGA1mfTjID7qJhd1+Vxhpk5HTgydrGU9IgkW github.com/vektah/gqlparser/v2 v2.5.31/go.mod h1:c1I28gSOVNzlfc4WuDlqU7voQnsqI6OG2amkBAFmgts= github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 h1:FnBeRrxr7OU4VvAzt5X7s6266i6cSVkkFPS0TuXWbIg= github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/internal/taskmanager/retentionService.go b/internal/taskmanager/retentionService.go index 5678cd14..453d10bc 100644 --- a/internal/taskmanager/retentionService.go +++ b/internal/taskmanager/retentionService.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/pkg/archive" + pqarchive "github.com/ClusterCockpit/cc-backend/pkg/archive/parquet" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/go-co-op/gocron/v2" ) @@ -66,3 +67,96 @@ func RegisterRetentionMoveService(age int, includeDB bool, location string, omit } })) } + +func RegisterRetentionParquetService(retention Retention) { + cclog.Info("Register retention parquet service") + + maxFileSizeMB := retention.MaxFileSizeMB + if maxFileSizeMB <= 0 { + maxFileSizeMB = 512 + } + + var target pqarchive.ParquetTarget + var err error + + switch retention.TargetKind { + case "s3": + target, err = pqarchive.NewS3Target(pqarchive.S3TargetConfig{ + Endpoint: retention.TargetEndpoint, + Bucket: retention.TargetBucket, + AccessKey: retention.TargetAccessKey, + SecretKey: retention.TargetSecretKey, + Region: retention.TargetRegion, + UsePathStyle: retention.TargetUsePathStyle, + }) + default: + target, err = pqarchive.NewFileTarget(retention.TargetPath) + } + + if err != nil { + cclog.Errorf("Parquet retention: failed to create target: %v", err) + return + } + + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(5, 0, 0))), + gocron.NewTask( + func() { + startTime := time.Now().Unix() - int64(retention.Age*24*3600) + jobs, err := jobRepo.FindJobsBetween(0, startTime, retention.OmitTagged) + if err != nil { + cclog.Warnf("Parquet retention: error finding jobs: %v", err) + return + } + if len(jobs) == 0 { + return + } + + cclog.Infof("Parquet retention: processing %d jobs", len(jobs)) + ar := archive.GetHandle() + pw := pqarchive.NewParquetWriter(target, maxFileSizeMB) + + for _, job := range jobs { + meta, err := ar.LoadJobMeta(job) + if err != nil { + cclog.Warnf("Parquet retention: load meta for job %d: %v", job.JobID, err) + continue + } + + data, err := ar.LoadJobData(job) + if err != nil { + cclog.Warnf("Parquet retention: load data for job %d: %v", job.JobID, err) + continue + } + + row, err := pqarchive.JobToParquetRow(meta, &data) + if err != nil { + cclog.Warnf("Parquet retention: convert job %d: %v", job.JobID, err) + continue + } + + if err := pw.AddJob(*row); err != nil { + cclog.Errorf("Parquet retention: add job %d to writer: %v", job.JobID, err) + continue + } + } + + if err := pw.Close(); err != nil { + cclog.Errorf("Parquet retention: close writer: %v", err) + return + } + + ar.CleanUp(jobs) + + if retention.IncludeDB { + cnt, err := jobRepo.DeleteJobsBefore(startTime, retention.OmitTagged) + if err != nil { + cclog.Errorf("Parquet retention: delete jobs from db: %v", err) + } else { + cclog.Infof("Parquet retention: removed %d jobs from db", cnt) + } + if err = jobRepo.Optimize(); err != nil { + cclog.Errorf("Parquet retention: db optimization error: %v", err) + } + } + })) +} diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index cbc4120f..e323557b 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -23,11 +23,20 @@ const ( // Retention defines the configuration for job retention policies. type Retention struct { - Policy string `json:"policy"` - Location string `json:"location"` - Age int `json:"age"` - IncludeDB bool `json:"includeDB"` - OmitTagged bool `json:"omitTagged"` + Policy string `json:"policy"` + Location string `json:"location"` + Age int `json:"age"` + IncludeDB bool `json:"includeDB"` + OmitTagged bool `json:"omitTagged"` + TargetKind string `json:"target-kind"` + TargetPath string `json:"target-path"` + TargetEndpoint string `json:"target-endpoint"` + TargetBucket string `json:"target-bucket"` + TargetAccessKey string `json:"target-access-key"` + TargetSecretKey string `json:"target-secret-key"` + TargetRegion string `json:"target-region"` + TargetUsePathStyle bool `json:"target-use-path-style"` + MaxFileSizeMB int `json:"max-file-size-mb"` } // CronFrequency defines the execution intervals for various background workers. @@ -87,6 +96,8 @@ func initArchiveServices(config json.RawMessage) { cfg.Retention.IncludeDB, cfg.Retention.Location, cfg.Retention.OmitTagged) + case "parquet": + RegisterRetentionParquetService(cfg.Retention) } if cfg.Compression > 0 { diff --git a/pkg/archive/ConfigSchema.go b/pkg/archive/ConfigSchema.go index aebcf37b..db568200 100644 --- a/pkg/archive/ConfigSchema.go +++ b/pkg/archive/ConfigSchema.go @@ -57,7 +57,7 @@ var configSchema = ` "policy": { "description": "Retention policy", "type": "string", - "enum": ["none", "delete", "move"] + "enum": ["none", "delete", "move", "parquet"] }, "include-db": { "description": "Also remove jobs from database", @@ -70,6 +70,43 @@ var configSchema = ` "location": { "description": "The target directory for retention. Only applicable for retention move.", "type": "string" + }, + "target-kind": { + "description": "Target storage kind for parquet retention: file or s3", + "type": "string", + "enum": ["file", "s3"] + }, + "target-path": { + "description": "Target directory path for parquet file storage", + "type": "string" + }, + "target-endpoint": { + "description": "S3 endpoint URL for parquet target", + "type": "string" + }, + "target-bucket": { + "description": "S3 bucket name for parquet target", + "type": "string" + }, + "target-access-key": { + "description": "S3 access key for parquet target", + "type": "string" + }, + "target-secret-key": { + "description": "S3 secret key for parquet target", + "type": "string" + }, + "target-region": { + "description": "S3 region for parquet target", + "type": "string" + }, + "target-use-path-style": { + "description": "Use path-style S3 URLs for parquet target", + "type": "boolean" + }, + "max-file-size-mb": { + "description": "Maximum parquet file size in MB before splitting", + "type": "integer" } }, "required": ["policy"] diff --git a/pkg/archive/parquet/convert.go b/pkg/archive/parquet/convert.go new file mode 100644 index 00000000..ceaa3f2f --- /dev/null +++ b/pkg/archive/parquet/convert.go @@ -0,0 +1,116 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +// JobToParquetRow converts job metadata and metric data into a flat ParquetJobRow. +// Nested fields are marshaled to JSON; metric data is gzip-compressed JSON. +func JobToParquetRow(meta *schema.Job, data *schema.JobData) (*ParquetJobRow, error) { + resourcesJSON, err := json.Marshal(meta.Resources) + if err != nil { + return nil, fmt.Errorf("marshal resources: %w", err) + } + + var statisticsJSON []byte + if meta.Statistics != nil { + statisticsJSON, err = json.Marshal(meta.Statistics) + if err != nil { + return nil, fmt.Errorf("marshal statistics: %w", err) + } + } + + var tagsJSON []byte + if len(meta.Tags) > 0 { + tagsJSON, err = json.Marshal(meta.Tags) + if err != nil { + return nil, fmt.Errorf("marshal tags: %w", err) + } + } + + var metaDataJSON []byte + if meta.MetaData != nil { + metaDataJSON, err = json.Marshal(meta.MetaData) + if err != nil { + return nil, fmt.Errorf("marshal metadata: %w", err) + } + } + + var footprintJSON []byte + if meta.Footprint != nil { + footprintJSON, err = json.Marshal(meta.Footprint) + if err != nil { + return nil, fmt.Errorf("marshal footprint: %w", err) + } + } + + var energyFootJSON []byte + if meta.EnergyFootprint != nil { + energyFootJSON, err = json.Marshal(meta.EnergyFootprint) + if err != nil { + return nil, fmt.Errorf("marshal energy footprint: %w", err) + } + } + + metricDataGz, err := compressJobData(data) + if err != nil { + return nil, fmt.Errorf("compress metric data: %w", err) + } + + return &ParquetJobRow{ + JobID: meta.JobID, + Cluster: meta.Cluster, + SubCluster: meta.SubCluster, + Partition: meta.Partition, + Project: meta.Project, + User: meta.User, + State: string(meta.State), + StartTime: meta.StartTime, + Duration: meta.Duration, + Walltime: meta.Walltime, + NumNodes: meta.NumNodes, + NumHWThreads: meta.NumHWThreads, + NumAcc: meta.NumAcc, + Exclusive: meta.Exclusive, + Energy: meta.Energy, + SMT: meta.SMT, + ResourcesJSON: resourcesJSON, + StatisticsJSON: statisticsJSON, + TagsJSON: tagsJSON, + MetaDataJSON: metaDataJSON, + FootprintJSON: footprintJSON, + EnergyFootJSON: energyFootJSON, + MetricDataGz: metricDataGz, + }, nil +} + +func compressJobData(data *schema.JobData) ([]byte, error) { + jsonBytes, err := json.Marshal(data) + if err != nil { + return nil, err + } + + var buf bytes.Buffer + gz, err := gzip.NewWriterLevel(&buf, gzip.BestCompression) + if err != nil { + return nil, err + } + if _, err := gz.Write(jsonBytes); err != nil { + return nil, err + } + if err := gz.Close(); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} diff --git a/pkg/archive/parquet/schema.go b/pkg/archive/parquet/schema.go new file mode 100644 index 00000000..74f82599 --- /dev/null +++ b/pkg/archive/parquet/schema.go @@ -0,0 +1,32 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +type ParquetJobRow struct { + JobID int64 `parquet:"job_id"` + Cluster string `parquet:"cluster"` + SubCluster string `parquet:"sub_cluster"` + Partition string `parquet:"partition,optional"` + Project string `parquet:"project"` + User string `parquet:"user"` + State string `parquet:"job_state"` + StartTime int64 `parquet:"start_time"` + Duration int32 `parquet:"duration"` + Walltime int64 `parquet:"walltime"` + NumNodes int32 `parquet:"num_nodes"` + NumHWThreads int32 `parquet:"num_hwthreads"` + NumAcc int32 `parquet:"num_acc"` + Exclusive int32 `parquet:"exclusive"` + Energy float64 `parquet:"energy"` + SMT int32 `parquet:"smt"` + ResourcesJSON []byte `parquet:"resources_json"` + StatisticsJSON []byte `parquet:"statistics_json,optional"` + TagsJSON []byte `parquet:"tags_json,optional"` + MetaDataJSON []byte `parquet:"meta_data_json,optional"` + FootprintJSON []byte `parquet:"footprint_json,optional"` + EnergyFootJSON []byte `parquet:"energy_footprint_json,optional"` + MetricDataGz []byte `parquet:"metric_data_gz"` +} diff --git a/pkg/archive/parquet/target.go b/pkg/archive/parquet/target.go new file mode 100644 index 00000000..0e8babc2 --- /dev/null +++ b/pkg/archive/parquet/target.go @@ -0,0 +1,100 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +import ( + "bytes" + "context" + "fmt" + "os" + "path/filepath" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +// ParquetTarget abstracts the destination for parquet file writes. +type ParquetTarget interface { + WriteFile(name string, data []byte) error +} + +// FileTarget writes parquet files to a local filesystem directory. +type FileTarget struct { + path string +} + +func NewFileTarget(path string) (*FileTarget, error) { + if err := os.MkdirAll(path, 0o750); err != nil { + return nil, fmt.Errorf("create target directory: %w", err) + } + return &FileTarget{path: path}, nil +} + +func (ft *FileTarget) WriteFile(name string, data []byte) error { + return os.WriteFile(filepath.Join(ft.path, name), data, 0o640) +} + +// S3TargetConfig holds the configuration for an S3 parquet target. +type S3TargetConfig struct { + Endpoint string + Bucket string + AccessKey string + SecretKey string + Region string + UsePathStyle bool +} + +// S3Target writes parquet files to an S3-compatible object store. +type S3Target struct { + client *s3.Client + bucket string +} + +func NewS3Target(cfg S3TargetConfig) (*S3Target, error) { + if cfg.Bucket == "" { + return nil, fmt.Errorf("S3 target: empty bucket name") + } + + region := cfg.Region + if region == "" { + region = "us-east-1" + } + + awsCfg, err := awsconfig.LoadDefaultConfig(context.Background(), + awsconfig.WithRegion(region), + awsconfig.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider(cfg.AccessKey, cfg.SecretKey, ""), + ), + ) + if err != nil { + return nil, fmt.Errorf("S3 target: load AWS config: %w", err) + } + + opts := func(o *s3.Options) { + if cfg.Endpoint != "" { + o.BaseEndpoint = aws.String(cfg.Endpoint) + } + o.UsePathStyle = cfg.UsePathStyle + } + + client := s3.NewFromConfig(awsCfg, opts) + return &S3Target{client: client, bucket: cfg.Bucket}, nil +} + +func (st *S3Target) WriteFile(name string, data []byte) error { + _, err := st.client.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String(st.bucket), + Key: aws.String(name), + Body: bytes.NewReader(data), + ContentType: aws.String("application/vnd.apache.parquet"), + }) + if err != nil { + return fmt.Errorf("S3 target: put object %q: %w", name, err) + } + return nil +} diff --git a/pkg/archive/parquet/writer.go b/pkg/archive/parquet/writer.go new file mode 100644 index 00000000..ab56cace --- /dev/null +++ b/pkg/archive/parquet/writer.go @@ -0,0 +1,113 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +import ( + "bytes" + "fmt" + "time" + + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + pq "github.com/parquet-go/parquet-go" +) + +// ParquetWriter batches ParquetJobRows and flushes them to a target +// when the estimated size exceeds maxSizeBytes. +type ParquetWriter struct { + target ParquetTarget + maxSizeBytes int64 + rows []ParquetJobRow + currentSize int64 + fileCounter int + datePrefix string +} + +// NewParquetWriter creates a new writer that flushes batches to the given target. +// maxSizeMB sets the approximate maximum size per parquet file in megabytes. +func NewParquetWriter(target ParquetTarget, maxSizeMB int) *ParquetWriter { + return &ParquetWriter{ + target: target, + maxSizeBytes: int64(maxSizeMB) * 1024 * 1024, + datePrefix: time.Now().Format("2006-01-02"), + } +} + +// AddJob adds a row to the current batch. If the estimated batch size +// exceeds the configured maximum, the batch is flushed to the target first. +func (pw *ParquetWriter) AddJob(row ParquetJobRow) error { + rowSize := estimateRowSize(&row) + + if pw.currentSize+rowSize > pw.maxSizeBytes && len(pw.rows) > 0 { + if err := pw.Flush(); err != nil { + return err + } + } + + pw.rows = append(pw.rows, row) + pw.currentSize += rowSize + return nil +} + +// Flush writes the current batch to a parquet file on the target. +func (pw *ParquetWriter) Flush() error { + if len(pw.rows) == 0 { + return nil + } + + pw.fileCounter++ + fileName := fmt.Sprintf("cc-archive-%s-%03d.parquet", pw.datePrefix, pw.fileCounter) + + data, err := writeParquetBytes(pw.rows) + if err != nil { + return fmt.Errorf("write parquet buffer: %w", err) + } + + if err := pw.target.WriteFile(fileName, data); err != nil { + return fmt.Errorf("write parquet file %q: %w", fileName, err) + } + + cclog.Infof("Parquet retention: wrote %s (%d jobs, %d bytes)", fileName, len(pw.rows), len(data)) + pw.rows = pw.rows[:0] + pw.currentSize = 0 + return nil +} + +// Close flushes any remaining rows and finalizes the writer. +func (pw *ParquetWriter) Close() error { + return pw.Flush() +} + +func writeParquetBytes(rows []ParquetJobRow) ([]byte, error) { + var buf bytes.Buffer + + writer := pq.NewGenericWriter[ParquetJobRow](&buf, + pq.Compression(&pq.Snappy), + ) + + if _, err := writer.Write(rows); err != nil { + return nil, err + } + if err := writer.Close(); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func estimateRowSize(row *ParquetJobRow) int64 { + // Fixed fields: ~100 bytes for numeric fields + strings estimate + size := int64(200) + size += int64(len(row.Cluster) + len(row.SubCluster) + len(row.Partition) + + len(row.Project) + len(row.User) + len(row.State)) + size += int64(len(row.ResourcesJSON)) + size += int64(len(row.StatisticsJSON)) + size += int64(len(row.TagsJSON)) + size += int64(len(row.MetaDataJSON)) + size += int64(len(row.FootprintJSON)) + size += int64(len(row.EnergyFootJSON)) + size += int64(len(row.MetricDataGz)) + return size +} diff --git a/pkg/archive/parquet/writer_test.go b/pkg/archive/parquet/writer_test.go new file mode 100644 index 00000000..6baaa527 --- /dev/null +++ b/pkg/archive/parquet/writer_test.go @@ -0,0 +1,225 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "io" + "sync" + "testing" + + "github.com/ClusterCockpit/cc-lib/v2/schema" + pq "github.com/parquet-go/parquet-go" +) + +// memTarget collects written files in memory for testing. +type memTarget struct { + mu sync.Mutex + files map[string][]byte +} + +func newMemTarget() *memTarget { + return &memTarget{files: make(map[string][]byte)} +} + +func (m *memTarget) WriteFile(name string, data []byte) error { + m.mu.Lock() + defer m.mu.Unlock() + m.files[name] = append([]byte(nil), data...) + return nil +} + +func makeTestJob(jobID int64) (*schema.Job, *schema.JobData) { + meta := &schema.Job{ + JobID: jobID, + Cluster: "testcluster", + SubCluster: "sc0", + Project: "testproject", + User: "testuser", + State: schema.JobStateCompleted, + StartTime: 1700000000, + Duration: 3600, + Walltime: 7200, + NumNodes: 2, + NumHWThreads: 16, + Exclusive: 1, + SMT: 1, + Resources: []*schema.Resource{ + {Hostname: "node001"}, + {Hostname: "node002"}, + }, + } + + data := schema.JobData{ + "cpu_load": { + schema.MetricScopeNode: &schema.JobMetric{ + Unit: schema.Unit{Base: ""}, + Timestep: 60, + Series: []schema.Series{ + { + Hostname: "node001", + Data: []schema.Float{1.0, 2.0, 3.0}, + }, + }, + }, + }, + } + + return meta, &data +} + +func TestJobToParquetRowConversion(t *testing.T) { + meta, data := makeTestJob(1001) + meta.Tags = []*schema.Tag{{Type: "test", Name: "tag1"}} + meta.MetaData = map[string]string{"key": "value"} + + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("JobToParquetRow: %v", err) + } + + if row.JobID != 1001 { + t.Errorf("JobID = %d, want 1001", row.JobID) + } + if row.Cluster != "testcluster" { + t.Errorf("Cluster = %q, want %q", row.Cluster, "testcluster") + } + if row.User != "testuser" { + t.Errorf("User = %q, want %q", row.User, "testuser") + } + if row.State != "completed" { + t.Errorf("State = %q, want %q", row.State, "completed") + } + if row.NumNodes != 2 { + t.Errorf("NumNodes = %d, want 2", row.NumNodes) + } + + // Verify resources JSON + var resources []*schema.Resource + if err := json.Unmarshal(row.ResourcesJSON, &resources); err != nil { + t.Fatalf("unmarshal resources: %v", err) + } + if len(resources) != 2 { + t.Errorf("resources len = %d, want 2", len(resources)) + } + + // Verify tags JSON + var tags []*schema.Tag + if err := json.Unmarshal(row.TagsJSON, &tags); err != nil { + t.Fatalf("unmarshal tags: %v", err) + } + if len(tags) != 1 || tags[0].Name != "tag1" { + t.Errorf("tags = %v, want [{test tag1}]", tags) + } + + // Verify metric data is gzip-compressed valid JSON + gz, err := gzip.NewReader(bytes.NewReader(row.MetricDataGz)) + if err != nil { + t.Fatalf("gzip reader: %v", err) + } + decompressed, err := io.ReadAll(gz) + if err != nil { + t.Fatalf("gzip read: %v", err) + } + var jobData schema.JobData + if err := json.Unmarshal(decompressed, &jobData); err != nil { + t.Fatalf("unmarshal metric data: %v", err) + } + if _, ok := jobData["cpu_load"]; !ok { + t.Error("metric data missing cpu_load key") + } +} + +func TestParquetWriterSingleBatch(t *testing.T) { + target := newMemTarget() + pw := NewParquetWriter(target, 512) + + for i := int64(0); i < 5; i++ { + meta, data := makeTestJob(i) + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("convert job %d: %v", i, err) + } + if err := pw.AddJob(*row); err != nil { + t.Fatalf("add job %d: %v", i, err) + } + } + + if err := pw.Close(); err != nil { + t.Fatalf("close: %v", err) + } + + if len(target.files) != 1 { + t.Fatalf("expected 1 file, got %d", len(target.files)) + } + + // Verify the parquet file is readable + for name, data := range target.files { + file := bytes.NewReader(data) + pf, err := pq.OpenFile(file, int64(len(data))) + if err != nil { + t.Fatalf("open parquet %s: %v", name, err) + } + if pf.NumRows() != 5 { + t.Errorf("parquet rows = %d, want 5", pf.NumRows()) + } + } +} + +func TestParquetWriterBatching(t *testing.T) { + target := newMemTarget() + // Use a very small max size to force multiple files + pw := NewParquetWriter(target, 0) // 0 MB means every job triggers a flush + pw.maxSizeBytes = 1 // Force flush after every row + + for i := int64(0); i < 3; i++ { + meta, data := makeTestJob(i) + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("convert job %d: %v", i, err) + } + if err := pw.AddJob(*row); err != nil { + t.Fatalf("add job %d: %v", i, err) + } + } + + if err := pw.Close(); err != nil { + t.Fatalf("close: %v", err) + } + + // With maxSizeBytes=1, each AddJob should flush the previous batch, + // resulting in multiple files + if len(target.files) < 2 { + t.Errorf("expected multiple files due to batching, got %d", len(target.files)) + } + + // Verify all files are valid parquet + for name, data := range target.files { + file := bytes.NewReader(data) + _, err := pq.OpenFile(file, int64(len(data))) + if err != nil { + t.Errorf("invalid parquet file %s: %v", name, err) + } + } +} + +func TestFileTarget(t *testing.T) { + dir := t.TempDir() + ft, err := NewFileTarget(dir) + if err != nil { + t.Fatalf("NewFileTarget: %v", err) + } + + testData := []byte("test parquet data") + if err := ft.WriteFile("test.parquet", testData); err != nil { + t.Fatalf("WriteFile: %v", err) + } + + // Verify file exists and has correct content + // (using the target itself is sufficient; we just check no error) +}