diff --git a/CLAUDE.md b/CLAUDE.md index 2148fdca..a8d56571 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -108,6 +108,7 @@ The backend follows a layered architecture with clear separation of concerns: - File system backend (default) - S3 backend - SQLite backend (experimental) + - **parquet** sub-package: Parquet format support (schema, reader, writer, conversion) - **internal/metricstoreclient**: Client for cc-metric-store queries ### Frontend Structure diff --git a/README.md b/README.md index d01c7140..3306f838 100644 --- a/README.md +++ b/README.md @@ -184,7 +184,8 @@ ln -s ./var/job-archive - [`tools/`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools) Additional command line helper tools. - [`archive-manager`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/archive-manager) - Commands for getting infos about an existing job archive. + Commands for getting infos about an existing job archive, importing jobs + between archive backends, and converting archives between JSON and Parquet formats. - [`archive-migration`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/archive-migration) Tool for migrating job archives between formats. - [`convert-pem-pubkey`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/convert-pem-pubkey) diff --git a/pkg/archive/parquet/convert.go b/pkg/archive/parquet/convert.go index ba1e76eb..43e611e4 100644 --- a/pkg/archive/parquet/convert.go +++ b/pkg/archive/parquet/convert.go @@ -93,6 +93,91 @@ func JobToParquetRow(meta *schema.Job, data *schema.JobData) (*ParquetJobRow, er }, nil } +// ParquetRowToJob converts a ParquetJobRow back into job metadata and metric data. +// This is the reverse of JobToParquetRow. +func ParquetRowToJob(row *ParquetJobRow) (*schema.Job, *schema.JobData, error) { + meta := &schema.Job{ + JobID: row.JobID, + Cluster: row.Cluster, + SubCluster: row.SubCluster, + Partition: row.Partition, + Project: row.Project, + User: row.User, + State: schema.JobState(row.State), + StartTime: row.StartTime, + Duration: row.Duration, + Walltime: row.Walltime, + NumNodes: row.NumNodes, + NumHWThreads: row.NumHWThreads, + NumAcc: row.NumAcc, + Energy: row.Energy, + SMT: row.SMT, + } + + if len(row.ResourcesJSON) > 0 { + if err := json.Unmarshal(row.ResourcesJSON, &meta.Resources); err != nil { + return nil, nil, fmt.Errorf("unmarshal resources: %w", err) + } + } + + if len(row.StatisticsJSON) > 0 { + if err := json.Unmarshal(row.StatisticsJSON, &meta.Statistics); err != nil { + return nil, nil, fmt.Errorf("unmarshal statistics: %w", err) + } + } + + if len(row.TagsJSON) > 0 { + if err := json.Unmarshal(row.TagsJSON, &meta.Tags); err != nil { + return nil, nil, fmt.Errorf("unmarshal tags: %w", err) + } + } + + if len(row.MetaDataJSON) > 0 { + if err := json.Unmarshal(row.MetaDataJSON, &meta.MetaData); err != nil { + return nil, nil, fmt.Errorf("unmarshal metadata: %w", err) + } + } + + if len(row.FootprintJSON) > 0 { + if err := json.Unmarshal(row.FootprintJSON, &meta.Footprint); err != nil { + return nil, nil, fmt.Errorf("unmarshal footprint: %w", err) + } + } + + if len(row.EnergyFootJSON) > 0 { + if err := json.Unmarshal(row.EnergyFootJSON, &meta.EnergyFootprint); err != nil { + return nil, nil, fmt.Errorf("unmarshal energy footprint: %w", err) + } + } + + data, err := decompressJobData(row.MetricDataGz) + if err != nil { + return nil, nil, fmt.Errorf("decompress metric data: %w", err) + } + + return meta, data, nil +} + +func decompressJobData(data []byte) (*schema.JobData, error) { + gz, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + defer gz.Close() + + var buf bytes.Buffer + if _, err := buf.ReadFrom(gz); err != nil { + return nil, err + } + + var jobData schema.JobData + if err := json.Unmarshal(buf.Bytes(), &jobData); err != nil { + return nil, err + } + + return &jobData, nil +} + func compressJobData(data *schema.JobData) ([]byte, error) { jsonBytes, err := json.Marshal(data) if err != nil { diff --git a/pkg/archive/parquet/convert_test.go b/pkg/archive/parquet/convert_test.go new file mode 100644 index 00000000..3b2848ba --- /dev/null +++ b/pkg/archive/parquet/convert_test.go @@ -0,0 +1,305 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +import ( + "testing" + + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +func TestParquetRowToJob(t *testing.T) { + meta := &schema.Job{ + JobID: 42, + Cluster: "testcluster", + SubCluster: "sc0", + Partition: "main", + Project: "testproject", + User: "testuser", + State: schema.JobStateCompleted, + StartTime: 1700000000, + Duration: 3600, + Walltime: 7200, + NumNodes: 2, + NumHWThreads: 16, + NumAcc: 4, + Energy: 123.45, + SMT: 2, + Resources: []*schema.Resource{ + {Hostname: "node001", HWThreads: []int{0, 1, 2, 3}}, + {Hostname: "node002", HWThreads: []int{4, 5, 6, 7}}, + }, + Statistics: map[string]schema.JobStatistics{ + "cpu_load": {Avg: 50.0, Min: 10.0, Max: 90.0}, + }, + Tags: []*schema.Tag{ + {Type: "test", Name: "tag1"}, + }, + MetaData: map[string]string{ + "key1": "value1", + }, + Footprint: map[string]float64{ + "cpu_load": 50.0, + }, + EnergyFootprint: map[string]float64{ + "total": 123.45, + }, + } + + data := &schema.JobData{ + "cpu_load": { + schema.MetricScopeNode: &schema.JobMetric{ + Unit: schema.Unit{Base: ""}, + Timestep: 60, + Series: []schema.Series{ + { + Hostname: "node001", + Data: []schema.Float{1.0, 2.0, 3.0}, + }, + }, + }, + }, + } + + // Convert to parquet row + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("JobToParquetRow: %v", err) + } + + // Convert back + gotMeta, gotData, err := ParquetRowToJob(row) + if err != nil { + t.Fatalf("ParquetRowToJob: %v", err) + } + + // Verify scalar fields + if gotMeta.JobID != meta.JobID { + t.Errorf("JobID = %d, want %d", gotMeta.JobID, meta.JobID) + } + if gotMeta.Cluster != meta.Cluster { + t.Errorf("Cluster = %q, want %q", gotMeta.Cluster, meta.Cluster) + } + if gotMeta.SubCluster != meta.SubCluster { + t.Errorf("SubCluster = %q, want %q", gotMeta.SubCluster, meta.SubCluster) + } + if gotMeta.Partition != meta.Partition { + t.Errorf("Partition = %q, want %q", gotMeta.Partition, meta.Partition) + } + if gotMeta.Project != meta.Project { + t.Errorf("Project = %q, want %q", gotMeta.Project, meta.Project) + } + if gotMeta.User != meta.User { + t.Errorf("User = %q, want %q", gotMeta.User, meta.User) + } + if gotMeta.State != meta.State { + t.Errorf("State = %q, want %q", gotMeta.State, meta.State) + } + if gotMeta.StartTime != meta.StartTime { + t.Errorf("StartTime = %d, want %d", gotMeta.StartTime, meta.StartTime) + } + if gotMeta.Duration != meta.Duration { + t.Errorf("Duration = %d, want %d", gotMeta.Duration, meta.Duration) + } + if gotMeta.Walltime != meta.Walltime { + t.Errorf("Walltime = %d, want %d", gotMeta.Walltime, meta.Walltime) + } + if gotMeta.NumNodes != meta.NumNodes { + t.Errorf("NumNodes = %d, want %d", gotMeta.NumNodes, meta.NumNodes) + } + if gotMeta.NumHWThreads != meta.NumHWThreads { + t.Errorf("NumHWThreads = %d, want %d", gotMeta.NumHWThreads, meta.NumHWThreads) + } + if gotMeta.NumAcc != meta.NumAcc { + t.Errorf("NumAcc = %d, want %d", gotMeta.NumAcc, meta.NumAcc) + } + if gotMeta.Energy != meta.Energy { + t.Errorf("Energy = %f, want %f", gotMeta.Energy, meta.Energy) + } + if gotMeta.SMT != meta.SMT { + t.Errorf("SMT = %d, want %d", gotMeta.SMT, meta.SMT) + } + + // Verify complex fields + if len(gotMeta.Resources) != 2 { + t.Fatalf("Resources len = %d, want 2", len(gotMeta.Resources)) + } + if gotMeta.Resources[0].Hostname != "node001" { + t.Errorf("Resources[0].Hostname = %q, want %q", gotMeta.Resources[0].Hostname, "node001") + } + if len(gotMeta.Resources[0].HWThreads) != 4 { + t.Errorf("Resources[0].HWThreads len = %d, want 4", len(gotMeta.Resources[0].HWThreads)) + } + + if len(gotMeta.Statistics) != 1 { + t.Fatalf("Statistics len = %d, want 1", len(gotMeta.Statistics)) + } + if stat, ok := gotMeta.Statistics["cpu_load"]; !ok { + t.Error("Statistics missing cpu_load") + } else if stat.Avg != 50.0 { + t.Errorf("Statistics[cpu_load].Avg = %f, want 50.0", stat.Avg) + } + + if len(gotMeta.Tags) != 1 || gotMeta.Tags[0].Name != "tag1" { + t.Errorf("Tags = %v, want [{test tag1}]", gotMeta.Tags) + } + + if gotMeta.MetaData["key1"] != "value1" { + t.Errorf("MetaData[key1] = %q, want %q", gotMeta.MetaData["key1"], "value1") + } + + if gotMeta.Footprint["cpu_load"] != 50.0 { + t.Errorf("Footprint[cpu_load] = %f, want 50.0", gotMeta.Footprint["cpu_load"]) + } + + if gotMeta.EnergyFootprint["total"] != 123.45 { + t.Errorf("EnergyFootprint[total] = %f, want 123.45", gotMeta.EnergyFootprint["total"]) + } + + // Verify metric data + if gotData == nil { + t.Fatal("JobData is nil") + } + cpuLoad, ok := (*gotData)["cpu_load"] + if !ok { + t.Fatal("JobData missing cpu_load") + } + nodeMetric, ok := cpuLoad[schema.MetricScopeNode] + if !ok { + t.Fatal("cpu_load missing node scope") + } + if nodeMetric.Timestep != 60 { + t.Errorf("Timestep = %d, want 60", nodeMetric.Timestep) + } + if len(nodeMetric.Series) != 1 { + t.Fatalf("Series len = %d, want 1", len(nodeMetric.Series)) + } + if nodeMetric.Series[0].Hostname != "node001" { + t.Errorf("Series[0].Hostname = %q, want %q", nodeMetric.Series[0].Hostname, "node001") + } + if len(nodeMetric.Series[0].Data) != 3 { + t.Errorf("Series[0].Data len = %d, want 3", len(nodeMetric.Series[0].Data)) + } +} + +func TestParquetRowToJobNilOptionalFields(t *testing.T) { + meta := &schema.Job{ + JobID: 1, + Cluster: "test", + SubCluster: "sc0", + Project: "proj", + User: "user", + State: schema.JobStateCompleted, + StartTime: 1700000000, + Duration: 60, + NumNodes: 1, + Resources: []*schema.Resource{ + {Hostname: "node001"}, + }, + } + + data := &schema.JobData{ + "cpu_load": { + schema.MetricScopeNode: &schema.JobMetric{ + Timestep: 60, + Series: []schema.Series{ + {Hostname: "node001", Data: []schema.Float{1.0}}, + }, + }, + }, + } + + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("JobToParquetRow: %v", err) + } + + gotMeta, gotData, err := ParquetRowToJob(row) + if err != nil { + t.Fatalf("ParquetRowToJob: %v", err) + } + + if gotMeta.JobID != 1 { + t.Errorf("JobID = %d, want 1", gotMeta.JobID) + } + if gotMeta.Tags != nil { + t.Errorf("Tags should be nil, got %v", gotMeta.Tags) + } + if gotMeta.Statistics != nil { + t.Errorf("Statistics should be nil, got %v", gotMeta.Statistics) + } + if gotMeta.MetaData != nil { + t.Errorf("MetaData should be nil, got %v", gotMeta.MetaData) + } + if gotMeta.Footprint != nil { + t.Errorf("Footprint should be nil, got %v", gotMeta.Footprint) + } + if gotMeta.EnergyFootprint != nil { + t.Errorf("EnergyFootprint should be nil, got %v", gotMeta.EnergyFootprint) + } + if gotData == nil { + t.Fatal("JobData is nil") + } +} + +func TestRoundTripThroughParquetFile(t *testing.T) { + meta, data := makeTestJob(999) + meta.Tags = []*schema.Tag{{Type: "test", Name: "roundtrip"}} + + // Convert to row and write to parquet + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("JobToParquetRow: %v", err) + } + + // Write to parquet bytes + parquetBytes, err := writeParquetBytes([]ParquetJobRow{*row}) + if err != nil { + t.Fatalf("writeParquetBytes: %v", err) + } + + // Read back from parquet bytes + rows, err := ReadParquetFile(parquetBytes) + if err != nil { + t.Fatalf("ReadParquetFile: %v", err) + } + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + + // Convert back to job + gotMeta, gotData, err := ParquetRowToJob(&rows[0]) + if err != nil { + t.Fatalf("ParquetRowToJob: %v", err) + } + + // Verify key fields survived the round trip + if gotMeta.JobID != 999 { + t.Errorf("JobID = %d, want 999", gotMeta.JobID) + } + if gotMeta.Cluster != "testcluster" { + t.Errorf("Cluster = %q, want %q", gotMeta.Cluster, "testcluster") + } + if gotMeta.User != "testuser" { + t.Errorf("User = %q, want %q", gotMeta.User, "testuser") + } + if gotMeta.State != schema.JobStateCompleted { + t.Errorf("State = %q, want %q", gotMeta.State, schema.JobStateCompleted) + } + if len(gotMeta.Tags) != 1 || gotMeta.Tags[0].Name != "roundtrip" { + t.Errorf("Tags = %v, want [{test roundtrip}]", gotMeta.Tags) + } + if len(gotMeta.Resources) != 2 { + t.Errorf("Resources len = %d, want 2", len(gotMeta.Resources)) + } + + if gotData == nil { + t.Fatal("JobData is nil") + } + if _, ok := (*gotData)["cpu_load"]; !ok { + t.Error("JobData missing cpu_load") + } +} diff --git a/pkg/archive/parquet/reader.go b/pkg/archive/parquet/reader.go new file mode 100644 index 00000000..32486bd5 --- /dev/null +++ b/pkg/archive/parquet/reader.go @@ -0,0 +1,216 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/ClusterCockpit/cc-lib/v2/schema" + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + pq "github.com/parquet-go/parquet-go" +) + +// ReadParquetFile reads all ParquetJobRow entries from parquet-encoded bytes. +func ReadParquetFile(data []byte) ([]ParquetJobRow, error) { + file, err := pq.OpenFile(bytes.NewReader(data), int64(len(data))) + if err != nil { + return nil, fmt.Errorf("open parquet: %w", err) + } + + reader := pq.NewGenericReader[ParquetJobRow](file) + defer reader.Close() + + numRows := file.NumRows() + rows := make([]ParquetJobRow, numRows) + n, err := reader.Read(rows) + if err != nil && err != io.EOF { + return nil, fmt.Errorf("read parquet rows: %w", err) + } + + return rows[:n], nil +} + +// ParquetSource abstracts reading parquet archives from different storage backends. +type ParquetSource interface { + GetClusters() ([]string, error) + ListParquetFiles(cluster string) ([]string, error) + ReadFile(path string) ([]byte, error) + ReadClusterConfig(cluster string) (*schema.Cluster, error) +} + +// FileParquetSource reads parquet archives from a local filesystem directory. +type FileParquetSource struct { + path string +} + +func NewFileParquetSource(path string) *FileParquetSource { + return &FileParquetSource{path: path} +} + +func (fs *FileParquetSource) GetClusters() ([]string, error) { + entries, err := os.ReadDir(fs.path) + if err != nil { + return nil, fmt.Errorf("read directory: %w", err) + } + + var clusters []string + for _, e := range entries { + if e.IsDir() { + clusters = append(clusters, e.Name()) + } + } + return clusters, nil +} + +func (fs *FileParquetSource) ListParquetFiles(cluster string) ([]string, error) { + dir := filepath.Join(fs.path, cluster) + entries, err := os.ReadDir(dir) + if err != nil { + return nil, fmt.Errorf("read cluster directory: %w", err) + } + + var files []string + for _, e := range entries { + if !e.IsDir() && strings.HasSuffix(e.Name(), ".parquet") { + files = append(files, filepath.Join(cluster, e.Name())) + } + } + return files, nil +} + +func (fs *FileParquetSource) ReadFile(path string) ([]byte, error) { + return os.ReadFile(filepath.Join(fs.path, path)) +} + +func (fs *FileParquetSource) ReadClusterConfig(cluster string) (*schema.Cluster, error) { + data, err := os.ReadFile(filepath.Join(fs.path, cluster, "cluster.json")) + if err != nil { + return nil, fmt.Errorf("read cluster.json: %w", err) + } + var cfg schema.Cluster + if err := json.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("unmarshal cluster config: %w", err) + } + return &cfg, nil +} + +// S3ParquetSource reads parquet archives from an S3-compatible object store. +type S3ParquetSource struct { + client *s3.Client + bucket string +} + +func NewS3ParquetSource(cfg S3TargetConfig) (*S3ParquetSource, error) { + if cfg.Bucket == "" { + return nil, fmt.Errorf("S3 source: empty bucket name") + } + + region := cfg.Region + if region == "" { + region = "us-east-1" + } + + awsCfg, err := awsconfig.LoadDefaultConfig(context.Background(), + awsconfig.WithRegion(region), + awsconfig.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider(cfg.AccessKey, cfg.SecretKey, ""), + ), + ) + if err != nil { + return nil, fmt.Errorf("S3 source: load AWS config: %w", err) + } + + opts := func(o *s3.Options) { + if cfg.Endpoint != "" { + o.BaseEndpoint = aws.String(cfg.Endpoint) + } + o.UsePathStyle = cfg.UsePathStyle + } + + client := s3.NewFromConfig(awsCfg, opts) + return &S3ParquetSource{client: client, bucket: cfg.Bucket}, nil +} + +func (ss *S3ParquetSource) GetClusters() ([]string, error) { + ctx := context.Background() + paginator := s3.NewListObjectsV2Paginator(ss.client, &s3.ListObjectsV2Input{ + Bucket: aws.String(ss.bucket), + Delimiter: aws.String("/"), + }) + + var clusters []string + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("S3 source: list clusters: %w", err) + } + for _, prefix := range page.CommonPrefixes { + if prefix.Prefix != nil { + name := strings.TrimSuffix(*prefix.Prefix, "/") + clusters = append(clusters, name) + } + } + } + return clusters, nil +} + +func (ss *S3ParquetSource) ListParquetFiles(cluster string) ([]string, error) { + ctx := context.Background() + prefix := cluster + "/" + paginator := s3.NewListObjectsV2Paginator(ss.client, &s3.ListObjectsV2Input{ + Bucket: aws.String(ss.bucket), + Prefix: aws.String(prefix), + }) + + var files []string + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("S3 source: list parquet files: %w", err) + } + for _, obj := range page.Contents { + if obj.Key != nil && strings.HasSuffix(*obj.Key, ".parquet") { + files = append(files, *obj.Key) + } + } + } + return files, nil +} + +func (ss *S3ParquetSource) ReadFile(path string) ([]byte, error) { + ctx := context.Background() + result, err := ss.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(ss.bucket), + Key: aws.String(path), + }) + if err != nil { + return nil, fmt.Errorf("S3 source: get object %q: %w", path, err) + } + defer result.Body.Close() + return io.ReadAll(result.Body) +} + +func (ss *S3ParquetSource) ReadClusterConfig(cluster string) (*schema.Cluster, error) { + data, err := ss.ReadFile(cluster + "/cluster.json") + if err != nil { + return nil, fmt.Errorf("read cluster.json: %w", err) + } + var cfg schema.Cluster + if err := json.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("unmarshal cluster config: %w", err) + } + return &cfg, nil +} diff --git a/tools/archive-manager/README.md b/tools/archive-manager/README.md new file mode 100644 index 00000000..c006a63e --- /dev/null +++ b/tools/archive-manager/README.md @@ -0,0 +1,148 @@ +# Archive Manager + +## Overview + +The `archive-manager` tool manages ClusterCockpit job archives. It supports inspecting archives, validating jobs, removing jobs by date range, importing jobs between archive backends, and converting archives between JSON and Parquet formats. + +## Features + +- **Archive Info**: Display statistics about an existing job archive +- **Validation**: Validate job archives against the JSON schema +- **Cleanup**: Remove jobs by date range +- **Import**: Copy jobs between archive backends (file, S3, SQLite) with parallel processing +- **Convert**: Convert archives between JSON and Parquet formats (both directions) +- **Progress Reporting**: Real-time progress display with ETA and throughput metrics +- **Graceful Interruption**: CTRL-C stops processing after finishing current jobs + +## Usage + +### Build + +```bash +go build ./tools/archive-manager/ +``` + +### Archive Info + +Display statistics about a job archive: + +```bash +./archive-manager -s ./var/job-archive +``` + +### Validate Archive + +```bash +./archive-manager -s ./var/job-archive --validate --config ./config.json +``` + +### Remove Jobs by Date + +```bash +# Remove jobs started before a date +./archive-manager -s ./var/job-archive --remove-before 2023-Jan-01 --config ./config.json + +# Remove jobs started after a date +./archive-manager -s ./var/job-archive --remove-after 2024-Dec-31 --config ./config.json +``` + +### Import Between Backends + +Import jobs from one archive backend to another (e.g., file to S3, file to SQLite): + +```bash +./archive-manager --import \ + --src-config '{"kind":"file","path":"./var/job-archive"}' \ + --dst-config '{"kind":"s3","endpoint":"https://s3.example.com","bucket":"archive","access-key":"...","secret-key":"..."}' +``` + +### Convert JSON to Parquet + +Convert a JSON job archive to Parquet format: + +```bash +./archive-manager --convert --format parquet \ + --src-config '{"kind":"file","path":"./var/job-archive"}' \ + --dst-config '{"kind":"file","path":"./var/parquet-archive"}' +``` + +The source (`--src-config`) is a standard archive backend config (file, S3, or SQLite). The destination (`--dst-config`) specifies where to write parquet files. + +### Convert Parquet to JSON + +Convert a Parquet archive back to JSON format: + +```bash +./archive-manager --convert --format json \ + --src-config '{"kind":"file","path":"./var/parquet-archive"}' \ + --dst-config '{"kind":"file","path":"./var/json-archive"}' +``` + +The source (`--src-config`) points to a directory or S3 bucket containing parquet files organized by cluster. The destination (`--dst-config`) is a standard archive backend config. + +### S3 Source/Destination Example + +Both conversion directions support S3: + +```bash +# JSON (S3) -> Parquet (local) +./archive-manager --convert --format parquet \ + --src-config '{"kind":"s3","endpoint":"https://s3.example.com","bucket":"json-archive","accessKey":"...","secretKey":"..."}' \ + --dst-config '{"kind":"file","path":"./var/parquet-archive"}' + +# Parquet (local) -> JSON (S3) +./archive-manager --convert --format json \ + --src-config '{"kind":"file","path":"./var/parquet-archive"}' \ + --dst-config '{"kind":"s3","endpoint":"https://s3.example.com","bucket":"json-archive","access-key":"...","secret-key":"..."}' +``` + +## Command-Line Options + +| Flag | Default | Description | +|------|---------|-------------| +| `-s` | `./var/job-archive` | Source job archive path (for info/validate/remove modes) | +| `--config` | `./config.json` | Path to config.json | +| `--loglevel` | `info` | Logging level: debug, info, warn, err, fatal, crit | +| `--logdate` | `false` | Add timestamps to log messages | +| `--validate` | `false` | Validate archive against JSON schema | +| `--remove-before` | | Remove jobs started before date (Format: 2006-Jan-02) | +| `--remove-after` | | Remove jobs started after date (Format: 2006-Jan-02) | +| `--import` | `false` | Import jobs between archive backends | +| `--convert` | `false` | Convert archive between JSON and Parquet formats | +| `--format` | `json` | Output format for conversion: `json` or `parquet` | +| `--max-file-size` | `512` | Max parquet file size in MB (only for parquet output) | +| `--src-config` | | Source config JSON (required for import/convert) | +| `--dst-config` | | Destination config JSON (required for import/convert) | + +## Parquet Archive Layout + +When converting to Parquet, the output is organized by cluster: + +``` +parquet-archive/ + clusterA/ + cluster.json + cc-archive-2025-01-20-001.parquet + cc-archive-2025-01-20-002.parquet + clusterB/ + cluster.json + cc-archive-2025-01-20-001.parquet +``` + +Each parquet file contains job metadata and gzip-compressed metric data. The `cluster.json` file preserves the cluster configuration from the source archive. + +## Round-Trip Conversion + +Archives can be converted from JSON to Parquet and back without data loss: + +```bash +# Original JSON archive +./archive-manager --convert --format parquet \ + --src-config '{"kind":"file","path":"./var/job-archive"}' \ + --dst-config '{"kind":"file","path":"./var/parquet-archive"}' + +# Convert back to JSON +./archive-manager --convert --format json \ + --src-config '{"kind":"file","path":"./var/parquet-archive"}' \ + --dst-config '{"kind":"file","path":"./var/json-archive"}' +``` diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 918fc7c8..4a9094c0 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -23,6 +23,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive" + pqarchive "github.com/ClusterCockpit/cc-backend/pkg/archive/parquet" ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" ) @@ -372,10 +373,207 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend, srcConfig stri return finalImported, finalFailed, nil } +// parseSourceConfig parses the common kind/path/s3 fields from a config JSON string. +type sourceConfig struct { + Kind string `json:"kind"` + Path string `json:"path"` + Endpoint string `json:"endpoint"` + Bucket string `json:"bucket"` + AccessKey string `json:"accessKey"` + SecretKey string `json:"secretKey"` + Region string `json:"region"` + UsePathStyle bool `json:"usePathStyle"` +} + +// createParquetTarget creates a ParquetTarget from a parsed config. +func createParquetTarget(cfg sourceConfig) (pqarchive.ParquetTarget, error) { + switch cfg.Kind { + case "s3": + return pqarchive.NewS3Target(pqarchive.S3TargetConfig{ + Endpoint: cfg.Endpoint, + Bucket: cfg.Bucket, + AccessKey: cfg.AccessKey, + SecretKey: cfg.SecretKey, + Region: cfg.Region, + UsePathStyle: cfg.UsePathStyle, + }) + default: + return pqarchive.NewFileTarget(cfg.Path) + } +} + +// createParquetSource creates a ParquetSource from a parsed config. +func createParquetSource(cfg sourceConfig) (pqarchive.ParquetSource, error) { + switch cfg.Kind { + case "s3": + return pqarchive.NewS3ParquetSource(pqarchive.S3TargetConfig{ + Endpoint: cfg.Endpoint, + Bucket: cfg.Bucket, + AccessKey: cfg.AccessKey, + SecretKey: cfg.SecretKey, + Region: cfg.Region, + UsePathStyle: cfg.UsePathStyle, + }) + default: + if cfg.Path == "" { + return nil, fmt.Errorf("file source: path is required") + } + return pqarchive.NewFileParquetSource(cfg.Path), nil + } +} + +// convertJSONToParquet converts a JSON archive backend to parquet format. +func convertJSONToParquet(srcBackend archive.ArchiveBackend, dstCfg sourceConfig, maxSizeMB int) error { + target, err := createParquetTarget(dstCfg) + if err != nil { + return fmt.Errorf("create parquet target: %w", err) + } + + cw := pqarchive.NewClusterAwareParquetWriter(target, maxSizeMB) + + // Transfer cluster configs + for _, clusterName := range srcBackend.GetClusters() { + clusterCfg, err := srcBackend.LoadClusterCfg(clusterName) + if err != nil { + cclog.Warnf("Convert: load cluster config %q: %v", clusterName, err) + continue + } + cw.SetClusterConfig(clusterName, clusterCfg) + } + + converted := 0 + failed := 0 + startTime := time.Now() + + for job := range srcBackend.Iter(true) { + if job.Meta == nil { + cclog.Warn("Skipping job with nil metadata") + failed++ + continue + } + if job.Data == nil { + cclog.Warnf("Job %d has no metric data, skipping", job.Meta.JobID) + failed++ + continue + } + + row, err := pqarchive.JobToParquetRow(job.Meta, job.Data) + if err != nil { + cclog.Warnf("Convert job %d: %v", job.Meta.JobID, err) + failed++ + continue + } + if err := cw.AddJob(*row); err != nil { + cclog.Errorf("Add job %d to writer: %v", job.Meta.JobID, err) + failed++ + continue + } + converted++ + + if converted%1000 == 0 { + cclog.Infof("Converted %d jobs so far...", converted) + } + } + + if err := cw.Close(); err != nil { + return fmt.Errorf("close parquet writer: %w", err) + } + + elapsed := time.Since(startTime) + cclog.Infof("JSON->Parquet conversion completed in %s: %d jobs converted, %d failed", + formatDuration(elapsed), converted, failed) + return nil +} + +// convertParquetToJSON converts a parquet archive to a JSON archive backend. +func convertParquetToJSON(srcCfg sourceConfig, dstBackend archive.ArchiveBackend) error { + src, err := createParquetSource(srcCfg) + if err != nil { + return fmt.Errorf("create parquet source: %w", err) + } + + clusters, err := src.GetClusters() + if err != nil { + return fmt.Errorf("list clusters: %w", err) + } + + converted := 0 + failed := 0 + skipped := 0 + startTime := time.Now() + + for _, cluster := range clusters { + // Transfer cluster config + clusterCfg, err := src.ReadClusterConfig(cluster) + if err != nil { + cclog.Warnf("Convert: read cluster config %q: %v", cluster, err) + } else { + if err := dstBackend.StoreClusterCfg(cluster, clusterCfg); err != nil { + cclog.Warnf("Convert: store cluster config %q: %v", cluster, err) + } else { + cclog.Infof("Imported cluster config for %s", cluster) + } + } + + // Read and convert parquet files + files, err := src.ListParquetFiles(cluster) + if err != nil { + cclog.Errorf("Convert: list parquet files for %q: %v", cluster, err) + continue + } + + for _, file := range files { + data, err := src.ReadFile(file) + if err != nil { + cclog.Errorf("Convert: read file %q: %v", file, err) + failed++ + continue + } + + rows, err := pqarchive.ReadParquetFile(data) + if err != nil { + cclog.Errorf("Convert: parse parquet file %q: %v", file, err) + failed++ + continue + } + + cclog.Infof("Processing %s: %d jobs", file, len(rows)) + + for _, row := range rows { + meta, jobData, err := pqarchive.ParquetRowToJob(&row) + if err != nil { + cclog.Warnf("Convert row to job: %v", err) + failed++ + continue + } + + if dstBackend.Exists(meta) { + skipped++ + continue + } + + if err := dstBackend.ImportJob(meta, jobData); err != nil { + cclog.Warnf("Import job %d: %v", meta.JobID, err) + failed++ + continue + } + converted++ + } + } + } + + elapsed := time.Since(startTime) + cclog.Infof("Parquet->JSON conversion completed in %s: %d jobs converted, %d skipped, %d failed", + formatDuration(elapsed), converted, skipped, failed) + return nil +} + func main() { var srcPath, flagConfigFile, flagLogLevel, flagRemoveCluster, flagRemoveAfter, flagRemoveBefore string var flagSrcConfig, flagDstConfig string - var flagLogDateTime, flagValidate, flagImport bool + var flagLogDateTime, flagValidate, flagImport, flagConvert bool + var flagFormat string + var flagMaxFileSize int flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") @@ -386,6 +584,9 @@ func main() { flag.StringVar(&flagRemoveAfter, "remove-after", "", "Remove all jobs with start time after date (Format: 2006-Jan-04)") flag.BoolVar(&flagValidate, "validate", false, "Set this flag to validate a job archive against the json schema") flag.BoolVar(&flagImport, "import", false, "Import jobs from source archive to destination archive") + flag.BoolVar(&flagConvert, "convert", false, "Convert archive between JSON and Parquet formats") + flag.StringVar(&flagFormat, "format", "json", "Output format for conversion: 'json' or 'parquet'") + flag.IntVar(&flagMaxFileSize, "max-file-size", 512, "Max parquet file size in MB (only for parquet output)") flag.StringVar(&flagSrcConfig, "src-config", "", "Source archive backend configuration (JSON), e.g. '{\"kind\":\"file\",\"path\":\"./archive\"}'") flag.StringVar(&flagDstConfig, "dst-config", "", "Destination archive backend configuration (JSON), e.g. '{\"kind\":\"sqlite\",\"dbPath\":\"./archive.db\"}'") flag.Parse() @@ -429,6 +630,49 @@ func main() { os.Exit(0) } + // Handle convert mode + if flagConvert { + if flagSrcConfig == "" || flagDstConfig == "" { + cclog.Fatal("Both --src-config and --dst-config must be specified for convert mode") + } + + var srcCfg, dstCfg sourceConfig + if err := json.Unmarshal([]byte(flagSrcConfig), &srcCfg); err != nil { + cclog.Fatalf("Failed to parse source config: %s", err.Error()) + } + if err := json.Unmarshal([]byte(flagDstConfig), &dstCfg); err != nil { + cclog.Fatalf("Failed to parse destination config: %s", err.Error()) + } + + switch flagFormat { + case "parquet": + // JSON archive -> Parquet: source is an archive backend + cclog.Info("Convert mode: JSON -> Parquet") + srcBackend, err := archive.InitBackend(json.RawMessage(flagSrcConfig)) + if err != nil { + cclog.Fatalf("Failed to initialize source backend: %s", err.Error()) + } + if err := convertJSONToParquet(srcBackend, dstCfg, flagMaxFileSize); err != nil { + cclog.Fatalf("Conversion failed: %s", err.Error()) + } + case "json": + // Parquet -> JSON archive: destination is an archive backend + cclog.Info("Convert mode: Parquet -> JSON") + dstBackend, err := archive.InitBackend(json.RawMessage(flagDstConfig)) + if err != nil { + cclog.Fatalf("Failed to initialize destination backend: %s", err.Error()) + } + if err := convertParquetToJSON(srcCfg, dstBackend); err != nil { + cclog.Fatalf("Conversion failed: %s", err.Error()) + } + default: + cclog.Fatalf("Unknown format %q: must be 'json' or 'parquet'", flagFormat) + } + + cclog.Info("Conversion finished successfully") + os.Exit(0) + } + ccconf.Init(flagConfigFile) // Load and check main configuration