Add format conversion to archive manager

This commit is contained in:
2026-02-13 13:54:10 +01:00
parent 6e3462f962
commit 7d8b305cd9
7 changed files with 1002 additions and 2 deletions

View File

@@ -93,6 +93,91 @@ func JobToParquetRow(meta *schema.Job, data *schema.JobData) (*ParquetJobRow, er
}, nil
}
// ParquetRowToJob converts a ParquetJobRow back into job metadata and metric data.
// This is the reverse of JobToParquetRow.
func ParquetRowToJob(row *ParquetJobRow) (*schema.Job, *schema.JobData, error) {
meta := &schema.Job{
JobID: row.JobID,
Cluster: row.Cluster,
SubCluster: row.SubCluster,
Partition: row.Partition,
Project: row.Project,
User: row.User,
State: schema.JobState(row.State),
StartTime: row.StartTime,
Duration: row.Duration,
Walltime: row.Walltime,
NumNodes: row.NumNodes,
NumHWThreads: row.NumHWThreads,
NumAcc: row.NumAcc,
Energy: row.Energy,
SMT: row.SMT,
}
if len(row.ResourcesJSON) > 0 {
if err := json.Unmarshal(row.ResourcesJSON, &meta.Resources); err != nil {
return nil, nil, fmt.Errorf("unmarshal resources: %w", err)
}
}
if len(row.StatisticsJSON) > 0 {
if err := json.Unmarshal(row.StatisticsJSON, &meta.Statistics); err != nil {
return nil, nil, fmt.Errorf("unmarshal statistics: %w", err)
}
}
if len(row.TagsJSON) > 0 {
if err := json.Unmarshal(row.TagsJSON, &meta.Tags); err != nil {
return nil, nil, fmt.Errorf("unmarshal tags: %w", err)
}
}
if len(row.MetaDataJSON) > 0 {
if err := json.Unmarshal(row.MetaDataJSON, &meta.MetaData); err != nil {
return nil, nil, fmt.Errorf("unmarshal metadata: %w", err)
}
}
if len(row.FootprintJSON) > 0 {
if err := json.Unmarshal(row.FootprintJSON, &meta.Footprint); err != nil {
return nil, nil, fmt.Errorf("unmarshal footprint: %w", err)
}
}
if len(row.EnergyFootJSON) > 0 {
if err := json.Unmarshal(row.EnergyFootJSON, &meta.EnergyFootprint); err != nil {
return nil, nil, fmt.Errorf("unmarshal energy footprint: %w", err)
}
}
data, err := decompressJobData(row.MetricDataGz)
if err != nil {
return nil, nil, fmt.Errorf("decompress metric data: %w", err)
}
return meta, data, nil
}
func decompressJobData(data []byte) (*schema.JobData, error) {
gz, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer gz.Close()
var buf bytes.Buffer
if _, err := buf.ReadFrom(gz); err != nil {
return nil, err
}
var jobData schema.JobData
if err := json.Unmarshal(buf.Bytes(), &jobData); err != nil {
return nil, err
}
return &jobData, nil
}
func compressJobData(data *schema.JobData) ([]byte, error) {
jsonBytes, err := json.Marshal(data)
if err != nil {

View File

@@ -0,0 +1,305 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package parquet
import (
"testing"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
func TestParquetRowToJob(t *testing.T) {
meta := &schema.Job{
JobID: 42,
Cluster: "testcluster",
SubCluster: "sc0",
Partition: "main",
Project: "testproject",
User: "testuser",
State: schema.JobStateCompleted,
StartTime: 1700000000,
Duration: 3600,
Walltime: 7200,
NumNodes: 2,
NumHWThreads: 16,
NumAcc: 4,
Energy: 123.45,
SMT: 2,
Resources: []*schema.Resource{
{Hostname: "node001", HWThreads: []int{0, 1, 2, 3}},
{Hostname: "node002", HWThreads: []int{4, 5, 6, 7}},
},
Statistics: map[string]schema.JobStatistics{
"cpu_load": {Avg: 50.0, Min: 10.0, Max: 90.0},
},
Tags: []*schema.Tag{
{Type: "test", Name: "tag1"},
},
MetaData: map[string]string{
"key1": "value1",
},
Footprint: map[string]float64{
"cpu_load": 50.0,
},
EnergyFootprint: map[string]float64{
"total": 123.45,
},
}
data := &schema.JobData{
"cpu_load": {
schema.MetricScopeNode: &schema.JobMetric{
Unit: schema.Unit{Base: ""},
Timestep: 60,
Series: []schema.Series{
{
Hostname: "node001",
Data: []schema.Float{1.0, 2.0, 3.0},
},
},
},
},
}
// Convert to parquet row
row, err := JobToParquetRow(meta, data)
if err != nil {
t.Fatalf("JobToParquetRow: %v", err)
}
// Convert back
gotMeta, gotData, err := ParquetRowToJob(row)
if err != nil {
t.Fatalf("ParquetRowToJob: %v", err)
}
// Verify scalar fields
if gotMeta.JobID != meta.JobID {
t.Errorf("JobID = %d, want %d", gotMeta.JobID, meta.JobID)
}
if gotMeta.Cluster != meta.Cluster {
t.Errorf("Cluster = %q, want %q", gotMeta.Cluster, meta.Cluster)
}
if gotMeta.SubCluster != meta.SubCluster {
t.Errorf("SubCluster = %q, want %q", gotMeta.SubCluster, meta.SubCluster)
}
if gotMeta.Partition != meta.Partition {
t.Errorf("Partition = %q, want %q", gotMeta.Partition, meta.Partition)
}
if gotMeta.Project != meta.Project {
t.Errorf("Project = %q, want %q", gotMeta.Project, meta.Project)
}
if gotMeta.User != meta.User {
t.Errorf("User = %q, want %q", gotMeta.User, meta.User)
}
if gotMeta.State != meta.State {
t.Errorf("State = %q, want %q", gotMeta.State, meta.State)
}
if gotMeta.StartTime != meta.StartTime {
t.Errorf("StartTime = %d, want %d", gotMeta.StartTime, meta.StartTime)
}
if gotMeta.Duration != meta.Duration {
t.Errorf("Duration = %d, want %d", gotMeta.Duration, meta.Duration)
}
if gotMeta.Walltime != meta.Walltime {
t.Errorf("Walltime = %d, want %d", gotMeta.Walltime, meta.Walltime)
}
if gotMeta.NumNodes != meta.NumNodes {
t.Errorf("NumNodes = %d, want %d", gotMeta.NumNodes, meta.NumNodes)
}
if gotMeta.NumHWThreads != meta.NumHWThreads {
t.Errorf("NumHWThreads = %d, want %d", gotMeta.NumHWThreads, meta.NumHWThreads)
}
if gotMeta.NumAcc != meta.NumAcc {
t.Errorf("NumAcc = %d, want %d", gotMeta.NumAcc, meta.NumAcc)
}
if gotMeta.Energy != meta.Energy {
t.Errorf("Energy = %f, want %f", gotMeta.Energy, meta.Energy)
}
if gotMeta.SMT != meta.SMT {
t.Errorf("SMT = %d, want %d", gotMeta.SMT, meta.SMT)
}
// Verify complex fields
if len(gotMeta.Resources) != 2 {
t.Fatalf("Resources len = %d, want 2", len(gotMeta.Resources))
}
if gotMeta.Resources[0].Hostname != "node001" {
t.Errorf("Resources[0].Hostname = %q, want %q", gotMeta.Resources[0].Hostname, "node001")
}
if len(gotMeta.Resources[0].HWThreads) != 4 {
t.Errorf("Resources[0].HWThreads len = %d, want 4", len(gotMeta.Resources[0].HWThreads))
}
if len(gotMeta.Statistics) != 1 {
t.Fatalf("Statistics len = %d, want 1", len(gotMeta.Statistics))
}
if stat, ok := gotMeta.Statistics["cpu_load"]; !ok {
t.Error("Statistics missing cpu_load")
} else if stat.Avg != 50.0 {
t.Errorf("Statistics[cpu_load].Avg = %f, want 50.0", stat.Avg)
}
if len(gotMeta.Tags) != 1 || gotMeta.Tags[0].Name != "tag1" {
t.Errorf("Tags = %v, want [{test tag1}]", gotMeta.Tags)
}
if gotMeta.MetaData["key1"] != "value1" {
t.Errorf("MetaData[key1] = %q, want %q", gotMeta.MetaData["key1"], "value1")
}
if gotMeta.Footprint["cpu_load"] != 50.0 {
t.Errorf("Footprint[cpu_load] = %f, want 50.0", gotMeta.Footprint["cpu_load"])
}
if gotMeta.EnergyFootprint["total"] != 123.45 {
t.Errorf("EnergyFootprint[total] = %f, want 123.45", gotMeta.EnergyFootprint["total"])
}
// Verify metric data
if gotData == nil {
t.Fatal("JobData is nil")
}
cpuLoad, ok := (*gotData)["cpu_load"]
if !ok {
t.Fatal("JobData missing cpu_load")
}
nodeMetric, ok := cpuLoad[schema.MetricScopeNode]
if !ok {
t.Fatal("cpu_load missing node scope")
}
if nodeMetric.Timestep != 60 {
t.Errorf("Timestep = %d, want 60", nodeMetric.Timestep)
}
if len(nodeMetric.Series) != 1 {
t.Fatalf("Series len = %d, want 1", len(nodeMetric.Series))
}
if nodeMetric.Series[0].Hostname != "node001" {
t.Errorf("Series[0].Hostname = %q, want %q", nodeMetric.Series[0].Hostname, "node001")
}
if len(nodeMetric.Series[0].Data) != 3 {
t.Errorf("Series[0].Data len = %d, want 3", len(nodeMetric.Series[0].Data))
}
}
func TestParquetRowToJobNilOptionalFields(t *testing.T) {
meta := &schema.Job{
JobID: 1,
Cluster: "test",
SubCluster: "sc0",
Project: "proj",
User: "user",
State: schema.JobStateCompleted,
StartTime: 1700000000,
Duration: 60,
NumNodes: 1,
Resources: []*schema.Resource{
{Hostname: "node001"},
},
}
data := &schema.JobData{
"cpu_load": {
schema.MetricScopeNode: &schema.JobMetric{
Timestep: 60,
Series: []schema.Series{
{Hostname: "node001", Data: []schema.Float{1.0}},
},
},
},
}
row, err := JobToParquetRow(meta, data)
if err != nil {
t.Fatalf("JobToParquetRow: %v", err)
}
gotMeta, gotData, err := ParquetRowToJob(row)
if err != nil {
t.Fatalf("ParquetRowToJob: %v", err)
}
if gotMeta.JobID != 1 {
t.Errorf("JobID = %d, want 1", gotMeta.JobID)
}
if gotMeta.Tags != nil {
t.Errorf("Tags should be nil, got %v", gotMeta.Tags)
}
if gotMeta.Statistics != nil {
t.Errorf("Statistics should be nil, got %v", gotMeta.Statistics)
}
if gotMeta.MetaData != nil {
t.Errorf("MetaData should be nil, got %v", gotMeta.MetaData)
}
if gotMeta.Footprint != nil {
t.Errorf("Footprint should be nil, got %v", gotMeta.Footprint)
}
if gotMeta.EnergyFootprint != nil {
t.Errorf("EnergyFootprint should be nil, got %v", gotMeta.EnergyFootprint)
}
if gotData == nil {
t.Fatal("JobData is nil")
}
}
func TestRoundTripThroughParquetFile(t *testing.T) {
meta, data := makeTestJob(999)
meta.Tags = []*schema.Tag{{Type: "test", Name: "roundtrip"}}
// Convert to row and write to parquet
row, err := JobToParquetRow(meta, data)
if err != nil {
t.Fatalf("JobToParquetRow: %v", err)
}
// Write to parquet bytes
parquetBytes, err := writeParquetBytes([]ParquetJobRow{*row})
if err != nil {
t.Fatalf("writeParquetBytes: %v", err)
}
// Read back from parquet bytes
rows, err := ReadParquetFile(parquetBytes)
if err != nil {
t.Fatalf("ReadParquetFile: %v", err)
}
if len(rows) != 1 {
t.Fatalf("expected 1 row, got %d", len(rows))
}
// Convert back to job
gotMeta, gotData, err := ParquetRowToJob(&rows[0])
if err != nil {
t.Fatalf("ParquetRowToJob: %v", err)
}
// Verify key fields survived the round trip
if gotMeta.JobID != 999 {
t.Errorf("JobID = %d, want 999", gotMeta.JobID)
}
if gotMeta.Cluster != "testcluster" {
t.Errorf("Cluster = %q, want %q", gotMeta.Cluster, "testcluster")
}
if gotMeta.User != "testuser" {
t.Errorf("User = %q, want %q", gotMeta.User, "testuser")
}
if gotMeta.State != schema.JobStateCompleted {
t.Errorf("State = %q, want %q", gotMeta.State, schema.JobStateCompleted)
}
if len(gotMeta.Tags) != 1 || gotMeta.Tags[0].Name != "roundtrip" {
t.Errorf("Tags = %v, want [{test roundtrip}]", gotMeta.Tags)
}
if len(gotMeta.Resources) != 2 {
t.Errorf("Resources len = %d, want 2", len(gotMeta.Resources))
}
if gotData == nil {
t.Fatal("JobData is nil")
}
if _, ok := (*gotData)["cpu_load"]; !ok {
t.Error("JobData missing cpu_load")
}
}

View File

@@ -0,0 +1,216 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package parquet
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/ClusterCockpit/cc-lib/v2/schema"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
pq "github.com/parquet-go/parquet-go"
)
// ReadParquetFile reads all ParquetJobRow entries from parquet-encoded bytes.
func ReadParquetFile(data []byte) ([]ParquetJobRow, error) {
file, err := pq.OpenFile(bytes.NewReader(data), int64(len(data)))
if err != nil {
return nil, fmt.Errorf("open parquet: %w", err)
}
reader := pq.NewGenericReader[ParquetJobRow](file)
defer reader.Close()
numRows := file.NumRows()
rows := make([]ParquetJobRow, numRows)
n, err := reader.Read(rows)
if err != nil && err != io.EOF {
return nil, fmt.Errorf("read parquet rows: %w", err)
}
return rows[:n], nil
}
// ParquetSource abstracts reading parquet archives from different storage backends.
type ParquetSource interface {
GetClusters() ([]string, error)
ListParquetFiles(cluster string) ([]string, error)
ReadFile(path string) ([]byte, error)
ReadClusterConfig(cluster string) (*schema.Cluster, error)
}
// FileParquetSource reads parquet archives from a local filesystem directory.
type FileParquetSource struct {
path string
}
func NewFileParquetSource(path string) *FileParquetSource {
return &FileParquetSource{path: path}
}
func (fs *FileParquetSource) GetClusters() ([]string, error) {
entries, err := os.ReadDir(fs.path)
if err != nil {
return nil, fmt.Errorf("read directory: %w", err)
}
var clusters []string
for _, e := range entries {
if e.IsDir() {
clusters = append(clusters, e.Name())
}
}
return clusters, nil
}
func (fs *FileParquetSource) ListParquetFiles(cluster string) ([]string, error) {
dir := filepath.Join(fs.path, cluster)
entries, err := os.ReadDir(dir)
if err != nil {
return nil, fmt.Errorf("read cluster directory: %w", err)
}
var files []string
for _, e := range entries {
if !e.IsDir() && strings.HasSuffix(e.Name(), ".parquet") {
files = append(files, filepath.Join(cluster, e.Name()))
}
}
return files, nil
}
func (fs *FileParquetSource) ReadFile(path string) ([]byte, error) {
return os.ReadFile(filepath.Join(fs.path, path))
}
func (fs *FileParquetSource) ReadClusterConfig(cluster string) (*schema.Cluster, error) {
data, err := os.ReadFile(filepath.Join(fs.path, cluster, "cluster.json"))
if err != nil {
return nil, fmt.Errorf("read cluster.json: %w", err)
}
var cfg schema.Cluster
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("unmarshal cluster config: %w", err)
}
return &cfg, nil
}
// S3ParquetSource reads parquet archives from an S3-compatible object store.
type S3ParquetSource struct {
client *s3.Client
bucket string
}
func NewS3ParquetSource(cfg S3TargetConfig) (*S3ParquetSource, error) {
if cfg.Bucket == "" {
return nil, fmt.Errorf("S3 source: empty bucket name")
}
region := cfg.Region
if region == "" {
region = "us-east-1"
}
awsCfg, err := awsconfig.LoadDefaultConfig(context.Background(),
awsconfig.WithRegion(region),
awsconfig.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(cfg.AccessKey, cfg.SecretKey, ""),
),
)
if err != nil {
return nil, fmt.Errorf("S3 source: load AWS config: %w", err)
}
opts := func(o *s3.Options) {
if cfg.Endpoint != "" {
o.BaseEndpoint = aws.String(cfg.Endpoint)
}
o.UsePathStyle = cfg.UsePathStyle
}
client := s3.NewFromConfig(awsCfg, opts)
return &S3ParquetSource{client: client, bucket: cfg.Bucket}, nil
}
func (ss *S3ParquetSource) GetClusters() ([]string, error) {
ctx := context.Background()
paginator := s3.NewListObjectsV2Paginator(ss.client, &s3.ListObjectsV2Input{
Bucket: aws.String(ss.bucket),
Delimiter: aws.String("/"),
})
var clusters []string
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("S3 source: list clusters: %w", err)
}
for _, prefix := range page.CommonPrefixes {
if prefix.Prefix != nil {
name := strings.TrimSuffix(*prefix.Prefix, "/")
clusters = append(clusters, name)
}
}
}
return clusters, nil
}
func (ss *S3ParquetSource) ListParquetFiles(cluster string) ([]string, error) {
ctx := context.Background()
prefix := cluster + "/"
paginator := s3.NewListObjectsV2Paginator(ss.client, &s3.ListObjectsV2Input{
Bucket: aws.String(ss.bucket),
Prefix: aws.String(prefix),
})
var files []string
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("S3 source: list parquet files: %w", err)
}
for _, obj := range page.Contents {
if obj.Key != nil && strings.HasSuffix(*obj.Key, ".parquet") {
files = append(files, *obj.Key)
}
}
}
return files, nil
}
func (ss *S3ParquetSource) ReadFile(path string) ([]byte, error) {
ctx := context.Background()
result, err := ss.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(ss.bucket),
Key: aws.String(path),
})
if err != nil {
return nil, fmt.Errorf("S3 source: get object %q: %w", path, err)
}
defer result.Body.Close()
return io.ReadAll(result.Body)
}
func (ss *S3ParquetSource) ReadClusterConfig(cluster string) (*schema.Cluster, error) {
data, err := ss.ReadFile(cluster + "/cluster.json")
if err != nil {
return nil, fmt.Errorf("read cluster.json: %w", err)
}
var cfg schema.Cluster
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("unmarshal cluster config: %w", err)
}
return &cfg, nil
}