Add parquet file job archiving target

This commit is contained in:
2026-02-07 10:51:56 +01:00
parent a8194de492
commit c920c57f5d
10 changed files with 755 additions and 9 deletions

View File

@@ -57,7 +57,7 @@ var configSchema = `
"policy": {
"description": "Retention policy",
"type": "string",
"enum": ["none", "delete", "move"]
"enum": ["none", "delete", "move", "parquet"]
},
"include-db": {
"description": "Also remove jobs from database",
@@ -70,6 +70,43 @@ var configSchema = `
"location": {
"description": "The target directory for retention. Only applicable for retention move.",
"type": "string"
},
"target-kind": {
"description": "Target storage kind for parquet retention: file or s3",
"type": "string",
"enum": ["file", "s3"]
},
"target-path": {
"description": "Target directory path for parquet file storage",
"type": "string"
},
"target-endpoint": {
"description": "S3 endpoint URL for parquet target",
"type": "string"
},
"target-bucket": {
"description": "S3 bucket name for parquet target",
"type": "string"
},
"target-access-key": {
"description": "S3 access key for parquet target",
"type": "string"
},
"target-secret-key": {
"description": "S3 secret key for parquet target",
"type": "string"
},
"target-region": {
"description": "S3 region for parquet target",
"type": "string"
},
"target-use-path-style": {
"description": "Use path-style S3 URLs for parquet target",
"type": "boolean"
},
"max-file-size-mb": {
"description": "Maximum parquet file size in MB before splitting",
"type": "integer"
}
},
"required": ["policy"]

View File

@@ -0,0 +1,116 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package parquet
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
// JobToParquetRow converts job metadata and metric data into a flat ParquetJobRow.
// Nested fields are marshaled to JSON; metric data is gzip-compressed JSON.
func JobToParquetRow(meta *schema.Job, data *schema.JobData) (*ParquetJobRow, error) {
resourcesJSON, err := json.Marshal(meta.Resources)
if err != nil {
return nil, fmt.Errorf("marshal resources: %w", err)
}
var statisticsJSON []byte
if meta.Statistics != nil {
statisticsJSON, err = json.Marshal(meta.Statistics)
if err != nil {
return nil, fmt.Errorf("marshal statistics: %w", err)
}
}
var tagsJSON []byte
if len(meta.Tags) > 0 {
tagsJSON, err = json.Marshal(meta.Tags)
if err != nil {
return nil, fmt.Errorf("marshal tags: %w", err)
}
}
var metaDataJSON []byte
if meta.MetaData != nil {
metaDataJSON, err = json.Marshal(meta.MetaData)
if err != nil {
return nil, fmt.Errorf("marshal metadata: %w", err)
}
}
var footprintJSON []byte
if meta.Footprint != nil {
footprintJSON, err = json.Marshal(meta.Footprint)
if err != nil {
return nil, fmt.Errorf("marshal footprint: %w", err)
}
}
var energyFootJSON []byte
if meta.EnergyFootprint != nil {
energyFootJSON, err = json.Marshal(meta.EnergyFootprint)
if err != nil {
return nil, fmt.Errorf("marshal energy footprint: %w", err)
}
}
metricDataGz, err := compressJobData(data)
if err != nil {
return nil, fmt.Errorf("compress metric data: %w", err)
}
return &ParquetJobRow{
JobID: meta.JobID,
Cluster: meta.Cluster,
SubCluster: meta.SubCluster,
Partition: meta.Partition,
Project: meta.Project,
User: meta.User,
State: string(meta.State),
StartTime: meta.StartTime,
Duration: meta.Duration,
Walltime: meta.Walltime,
NumNodes: meta.NumNodes,
NumHWThreads: meta.NumHWThreads,
NumAcc: meta.NumAcc,
Exclusive: meta.Exclusive,
Energy: meta.Energy,
SMT: meta.SMT,
ResourcesJSON: resourcesJSON,
StatisticsJSON: statisticsJSON,
TagsJSON: tagsJSON,
MetaDataJSON: metaDataJSON,
FootprintJSON: footprintJSON,
EnergyFootJSON: energyFootJSON,
MetricDataGz: metricDataGz,
}, nil
}
func compressJobData(data *schema.JobData) ([]byte, error) {
jsonBytes, err := json.Marshal(data)
if err != nil {
return nil, err
}
var buf bytes.Buffer
gz, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
if err != nil {
return nil, err
}
if _, err := gz.Write(jsonBytes); err != nil {
return nil, err
}
if err := gz.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

View File

@@ -0,0 +1,32 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package parquet
type ParquetJobRow struct {
JobID int64 `parquet:"job_id"`
Cluster string `parquet:"cluster"`
SubCluster string `parquet:"sub_cluster"`
Partition string `parquet:"partition,optional"`
Project string `parquet:"project"`
User string `parquet:"user"`
State string `parquet:"job_state"`
StartTime int64 `parquet:"start_time"`
Duration int32 `parquet:"duration"`
Walltime int64 `parquet:"walltime"`
NumNodes int32 `parquet:"num_nodes"`
NumHWThreads int32 `parquet:"num_hwthreads"`
NumAcc int32 `parquet:"num_acc"`
Exclusive int32 `parquet:"exclusive"`
Energy float64 `parquet:"energy"`
SMT int32 `parquet:"smt"`
ResourcesJSON []byte `parquet:"resources_json"`
StatisticsJSON []byte `parquet:"statistics_json,optional"`
TagsJSON []byte `parquet:"tags_json,optional"`
MetaDataJSON []byte `parquet:"meta_data_json,optional"`
FootprintJSON []byte `parquet:"footprint_json,optional"`
EnergyFootJSON []byte `parquet:"energy_footprint_json,optional"`
MetricDataGz []byte `parquet:"metric_data_gz"`
}

View File

@@ -0,0 +1,100 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package parquet
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// ParquetTarget abstracts the destination for parquet file writes.
type ParquetTarget interface {
WriteFile(name string, data []byte) error
}
// FileTarget writes parquet files to a local filesystem directory.
type FileTarget struct {
path string
}
func NewFileTarget(path string) (*FileTarget, error) {
if err := os.MkdirAll(path, 0o750); err != nil {
return nil, fmt.Errorf("create target directory: %w", err)
}
return &FileTarget{path: path}, nil
}
func (ft *FileTarget) WriteFile(name string, data []byte) error {
return os.WriteFile(filepath.Join(ft.path, name), data, 0o640)
}
// S3TargetConfig holds the configuration for an S3 parquet target.
type S3TargetConfig struct {
Endpoint string
Bucket string
AccessKey string
SecretKey string
Region string
UsePathStyle bool
}
// S3Target writes parquet files to an S3-compatible object store.
type S3Target struct {
client *s3.Client
bucket string
}
func NewS3Target(cfg S3TargetConfig) (*S3Target, error) {
if cfg.Bucket == "" {
return nil, fmt.Errorf("S3 target: empty bucket name")
}
region := cfg.Region
if region == "" {
region = "us-east-1"
}
awsCfg, err := awsconfig.LoadDefaultConfig(context.Background(),
awsconfig.WithRegion(region),
awsconfig.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(cfg.AccessKey, cfg.SecretKey, ""),
),
)
if err != nil {
return nil, fmt.Errorf("S3 target: load AWS config: %w", err)
}
opts := func(o *s3.Options) {
if cfg.Endpoint != "" {
o.BaseEndpoint = aws.String(cfg.Endpoint)
}
o.UsePathStyle = cfg.UsePathStyle
}
client := s3.NewFromConfig(awsCfg, opts)
return &S3Target{client: client, bucket: cfg.Bucket}, nil
}
func (st *S3Target) WriteFile(name string, data []byte) error {
_, err := st.client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(st.bucket),
Key: aws.String(name),
Body: bytes.NewReader(data),
ContentType: aws.String("application/vnd.apache.parquet"),
})
if err != nil {
return fmt.Errorf("S3 target: put object %q: %w", name, err)
}
return nil
}

View File

@@ -0,0 +1,113 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package parquet
import (
"bytes"
"fmt"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
pq "github.com/parquet-go/parquet-go"
)
// ParquetWriter batches ParquetJobRows and flushes them to a target
// when the estimated size exceeds maxSizeBytes.
type ParquetWriter struct {
target ParquetTarget
maxSizeBytes int64
rows []ParquetJobRow
currentSize int64
fileCounter int
datePrefix string
}
// NewParquetWriter creates a new writer that flushes batches to the given target.
// maxSizeMB sets the approximate maximum size per parquet file in megabytes.
func NewParquetWriter(target ParquetTarget, maxSizeMB int) *ParquetWriter {
return &ParquetWriter{
target: target,
maxSizeBytes: int64(maxSizeMB) * 1024 * 1024,
datePrefix: time.Now().Format("2006-01-02"),
}
}
// AddJob adds a row to the current batch. If the estimated batch size
// exceeds the configured maximum, the batch is flushed to the target first.
func (pw *ParquetWriter) AddJob(row ParquetJobRow) error {
rowSize := estimateRowSize(&row)
if pw.currentSize+rowSize > pw.maxSizeBytes && len(pw.rows) > 0 {
if err := pw.Flush(); err != nil {
return err
}
}
pw.rows = append(pw.rows, row)
pw.currentSize += rowSize
return nil
}
// Flush writes the current batch to a parquet file on the target.
func (pw *ParquetWriter) Flush() error {
if len(pw.rows) == 0 {
return nil
}
pw.fileCounter++
fileName := fmt.Sprintf("cc-archive-%s-%03d.parquet", pw.datePrefix, pw.fileCounter)
data, err := writeParquetBytes(pw.rows)
if err != nil {
return fmt.Errorf("write parquet buffer: %w", err)
}
if err := pw.target.WriteFile(fileName, data); err != nil {
return fmt.Errorf("write parquet file %q: %w", fileName, err)
}
cclog.Infof("Parquet retention: wrote %s (%d jobs, %d bytes)", fileName, len(pw.rows), len(data))
pw.rows = pw.rows[:0]
pw.currentSize = 0
return nil
}
// Close flushes any remaining rows and finalizes the writer.
func (pw *ParquetWriter) Close() error {
return pw.Flush()
}
func writeParquetBytes(rows []ParquetJobRow) ([]byte, error) {
var buf bytes.Buffer
writer := pq.NewGenericWriter[ParquetJobRow](&buf,
pq.Compression(&pq.Snappy),
)
if _, err := writer.Write(rows); err != nil {
return nil, err
}
if err := writer.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func estimateRowSize(row *ParquetJobRow) int64 {
// Fixed fields: ~100 bytes for numeric fields + strings estimate
size := int64(200)
size += int64(len(row.Cluster) + len(row.SubCluster) + len(row.Partition) +
len(row.Project) + len(row.User) + len(row.State))
size += int64(len(row.ResourcesJSON))
size += int64(len(row.StatisticsJSON))
size += int64(len(row.TagsJSON))
size += int64(len(row.MetaDataJSON))
size += int64(len(row.FootprintJSON))
size += int64(len(row.EnergyFootJSON))
size += int64(len(row.MetricDataGz))
return size
}

View File

@@ -0,0 +1,225 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package parquet
import (
"bytes"
"compress/gzip"
"encoding/json"
"io"
"sync"
"testing"
"github.com/ClusterCockpit/cc-lib/v2/schema"
pq "github.com/parquet-go/parquet-go"
)
// memTarget collects written files in memory for testing.
type memTarget struct {
mu sync.Mutex
files map[string][]byte
}
func newMemTarget() *memTarget {
return &memTarget{files: make(map[string][]byte)}
}
func (m *memTarget) WriteFile(name string, data []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
m.files[name] = append([]byte(nil), data...)
return nil
}
func makeTestJob(jobID int64) (*schema.Job, *schema.JobData) {
meta := &schema.Job{
JobID: jobID,
Cluster: "testcluster",
SubCluster: "sc0",
Project: "testproject",
User: "testuser",
State: schema.JobStateCompleted,
StartTime: 1700000000,
Duration: 3600,
Walltime: 7200,
NumNodes: 2,
NumHWThreads: 16,
Exclusive: 1,
SMT: 1,
Resources: []*schema.Resource{
{Hostname: "node001"},
{Hostname: "node002"},
},
}
data := schema.JobData{
"cpu_load": {
schema.MetricScopeNode: &schema.JobMetric{
Unit: schema.Unit{Base: ""},
Timestep: 60,
Series: []schema.Series{
{
Hostname: "node001",
Data: []schema.Float{1.0, 2.0, 3.0},
},
},
},
},
}
return meta, &data
}
func TestJobToParquetRowConversion(t *testing.T) {
meta, data := makeTestJob(1001)
meta.Tags = []*schema.Tag{{Type: "test", Name: "tag1"}}
meta.MetaData = map[string]string{"key": "value"}
row, err := JobToParquetRow(meta, data)
if err != nil {
t.Fatalf("JobToParquetRow: %v", err)
}
if row.JobID != 1001 {
t.Errorf("JobID = %d, want 1001", row.JobID)
}
if row.Cluster != "testcluster" {
t.Errorf("Cluster = %q, want %q", row.Cluster, "testcluster")
}
if row.User != "testuser" {
t.Errorf("User = %q, want %q", row.User, "testuser")
}
if row.State != "completed" {
t.Errorf("State = %q, want %q", row.State, "completed")
}
if row.NumNodes != 2 {
t.Errorf("NumNodes = %d, want 2", row.NumNodes)
}
// Verify resources JSON
var resources []*schema.Resource
if err := json.Unmarshal(row.ResourcesJSON, &resources); err != nil {
t.Fatalf("unmarshal resources: %v", err)
}
if len(resources) != 2 {
t.Errorf("resources len = %d, want 2", len(resources))
}
// Verify tags JSON
var tags []*schema.Tag
if err := json.Unmarshal(row.TagsJSON, &tags); err != nil {
t.Fatalf("unmarshal tags: %v", err)
}
if len(tags) != 1 || tags[0].Name != "tag1" {
t.Errorf("tags = %v, want [{test tag1}]", tags)
}
// Verify metric data is gzip-compressed valid JSON
gz, err := gzip.NewReader(bytes.NewReader(row.MetricDataGz))
if err != nil {
t.Fatalf("gzip reader: %v", err)
}
decompressed, err := io.ReadAll(gz)
if err != nil {
t.Fatalf("gzip read: %v", err)
}
var jobData schema.JobData
if err := json.Unmarshal(decompressed, &jobData); err != nil {
t.Fatalf("unmarshal metric data: %v", err)
}
if _, ok := jobData["cpu_load"]; !ok {
t.Error("metric data missing cpu_load key")
}
}
func TestParquetWriterSingleBatch(t *testing.T) {
target := newMemTarget()
pw := NewParquetWriter(target, 512)
for i := int64(0); i < 5; i++ {
meta, data := makeTestJob(i)
row, err := JobToParquetRow(meta, data)
if err != nil {
t.Fatalf("convert job %d: %v", i, err)
}
if err := pw.AddJob(*row); err != nil {
t.Fatalf("add job %d: %v", i, err)
}
}
if err := pw.Close(); err != nil {
t.Fatalf("close: %v", err)
}
if len(target.files) != 1 {
t.Fatalf("expected 1 file, got %d", len(target.files))
}
// Verify the parquet file is readable
for name, data := range target.files {
file := bytes.NewReader(data)
pf, err := pq.OpenFile(file, int64(len(data)))
if err != nil {
t.Fatalf("open parquet %s: %v", name, err)
}
if pf.NumRows() != 5 {
t.Errorf("parquet rows = %d, want 5", pf.NumRows())
}
}
}
func TestParquetWriterBatching(t *testing.T) {
target := newMemTarget()
// Use a very small max size to force multiple files
pw := NewParquetWriter(target, 0) // 0 MB means every job triggers a flush
pw.maxSizeBytes = 1 // Force flush after every row
for i := int64(0); i < 3; i++ {
meta, data := makeTestJob(i)
row, err := JobToParquetRow(meta, data)
if err != nil {
t.Fatalf("convert job %d: %v", i, err)
}
if err := pw.AddJob(*row); err != nil {
t.Fatalf("add job %d: %v", i, err)
}
}
if err := pw.Close(); err != nil {
t.Fatalf("close: %v", err)
}
// With maxSizeBytes=1, each AddJob should flush the previous batch,
// resulting in multiple files
if len(target.files) < 2 {
t.Errorf("expected multiple files due to batching, got %d", len(target.files))
}
// Verify all files are valid parquet
for name, data := range target.files {
file := bytes.NewReader(data)
_, err := pq.OpenFile(file, int64(len(data)))
if err != nil {
t.Errorf("invalid parquet file %s: %v", name, err)
}
}
}
func TestFileTarget(t *testing.T) {
dir := t.TempDir()
ft, err := NewFileTarget(dir)
if err != nil {
t.Fatalf("NewFileTarget: %v", err)
}
testData := []byte("test parquet data")
if err := ft.WriteFile("test.parquet", testData); err != nil {
t.Fatalf("WriteFile: %v", err)
}
// Verify file exists and has correct content
// (using the target itself is sufficient; we just check no error)
}