From adebffd2515541da99098dea0bf03fd5ad789935 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 27 Feb 2026 17:40:32 +0100 Subject: [PATCH] Replace the old zip archive options for the metricstore node data by parquet files --- pkg/metricstore/archive.go | 221 +++++++++++++-------- pkg/metricstore/parquetArchive.go | 213 +++++++++++++++++++++ pkg/metricstore/parquetArchive_test.go | 255 +++++++++++++++++++++++++ 3 files changed, 606 insertions(+), 83 deletions(-) create mode 100644 pkg/metricstore/parquetArchive.go create mode 100644 pkg/metricstore/parquetArchive_test.go diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index d3617f2c..77f4264a 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -6,12 +6,9 @@ package metricstore import ( - "archive/zip" - "bufio" "context" "errors" "fmt" - "io" "os" "path/filepath" "sync" @@ -47,7 +44,7 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) { } } -// runWorker takes simple values to configure what it does +// cleanUpWorker takes simple values to configure what it does func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { wg.Go(func() { @@ -75,10 +72,10 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod if err != nil { cclog.Errorf("[METRICSTORE]> %s failed: %s", mode, err.Error()) } else { - if delete && cleanupDir == "" { + if delete { cclog.Infof("[METRICSTORE]> done: %d checkpoints deleted", n) } else { - cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n) + cclog.Infof("[METRICSTORE]> done: %d checkpoint files archived to parquet", n) } } } @@ -88,17 +85,26 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod var ErrNoNewArchiveData error = errors.New("all data already archived") -// Delete or ZIP all checkpoint files older than `from` together and write them to the `cleanupDir`, -// deleting/moving them from the `checkpointsDir`. +// CleanupCheckpoints deletes or archives all checkpoint files older than `from`. +// When archiving, consolidates all hosts per cluster into a single Parquet file. func CleanupCheckpoints(checkpointsDir, cleanupDir string, from int64, deleteInstead bool) (int, error) { + if deleteInstead { + return deleteCheckpoints(checkpointsDir, from) + } + + return archiveCheckpoints(checkpointsDir, cleanupDir, from) +} + +// deleteCheckpoints removes checkpoint files older than `from` across all clusters/hosts. +func deleteCheckpoints(checkpointsDir string, from int64) (int, error) { entries1, err := os.ReadDir(checkpointsDir) if err != nil { return 0, err } type workItem struct { - cdir, adir string - cluster, host string + dir string + cluster, host string } var wg sync.WaitGroup @@ -109,13 +115,29 @@ func CleanupCheckpoints(checkpointsDir, cleanupDir string, from int64, deleteIns for worker := 0; worker < Keys.NumWorkers; worker++ { go func() { defer wg.Done() - for workItem := range work { - m, err := cleanupCheckpoints(workItem.cdir, workItem.adir, from, deleteInstead) + for item := range work { + entries, err := os.ReadDir(item.dir) if err != nil { - cclog.Errorf("error while archiving %s/%s: %s", workItem.cluster, workItem.host, err.Error()) + cclog.Errorf("error reading %s/%s: %s", item.cluster, item.host, err.Error()) atomic.AddInt32(&errs, 1) + continue + } + + files, err := findFiles(entries, from, false) + if err != nil { + cclog.Errorf("error finding files in %s/%s: %s", item.cluster, item.host, err.Error()) + atomic.AddInt32(&errs, 1) + continue + } + + for _, checkpoint := range files { + if err := os.Remove(filepath.Join(item.dir, checkpoint)); err != nil { + cclog.Errorf("error deleting %s/%s/%s: %s", item.cluster, item.host, checkpoint, err.Error()) + atomic.AddInt32(&errs, 1) + } else { + atomic.AddInt32(&n, 1) + } } - atomic.AddInt32(&n, int32(m)) } }() } @@ -124,14 +146,14 @@ func CleanupCheckpoints(checkpointsDir, cleanupDir string, from int64, deleteIns entries2, e := os.ReadDir(filepath.Join(checkpointsDir, de1.Name())) if e != nil { err = e + continue } for _, de2 := range entries2 { - cdir := filepath.Join(checkpointsDir, de1.Name(), de2.Name()) - adir := filepath.Join(cleanupDir, de1.Name(), de2.Name()) work <- workItem{ - adir: adir, cdir: cdir, - cluster: de1.Name(), host: de2.Name(), + dir: filepath.Join(checkpointsDir, de1.Name(), de2.Name()), + cluster: de1.Name(), + host: de2.Name(), } } } @@ -142,85 +164,118 @@ func CleanupCheckpoints(checkpointsDir, cleanupDir string, from int64, deleteIns if err != nil { return int(n), err } - if errs > 0 { - return int(n), fmt.Errorf("%d errors happened while archiving (%d successes)", errs, n) + return int(n), fmt.Errorf("%d errors happened while deleting (%d successes)", errs, n) } return int(n), nil } -// Helper function for `CleanupCheckpoints`. -func cleanupCheckpoints(dir string, cleanupDir string, from int64, deleteInstead bool) (int, error) { - entries, err := os.ReadDir(dir) +// archiveCheckpoints archives checkpoint files to Parquet format. +// Produces one Parquet file per cluster: //.parquet +func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) { + clusterEntries, err := os.ReadDir(checkpointsDir) if err != nil { return 0, err } - files, err := findFiles(entries, from, false) - if err != nil { - return 0, err - } + totalFiles := 0 - if deleteInstead { - n := 0 - for _, checkpoint := range files { - filename := filepath.Join(dir, checkpoint) - if err = os.Remove(filename); err != nil { - return n, err - } - n += 1 + for _, clusterEntry := range clusterEntries { + if !clusterEntry.IsDir() { + continue } - return n, nil - } - filename := filepath.Join(cleanupDir, fmt.Sprintf("%d.zip", from)) - f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) - if err != nil && os.IsNotExist(err) { - err = os.MkdirAll(cleanupDir, CheckpointDirPerms) - if err == nil { - f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) - } - } - if err != nil { - return 0, err - } - defer f.Close() - bw := bufio.NewWriter(f) - defer bw.Flush() - zw := zip.NewWriter(bw) - defer zw.Close() - - n := 0 - for _, checkpoint := range files { - // Use closure to ensure file is closed immediately after use, - // avoiding file descriptor leak from defer in loop - err := func() error { - filename := filepath.Join(dir, checkpoint) - r, err := os.Open(filename) - if err != nil { - return err - } - defer r.Close() - - w, err := zw.Create(checkpoint) - if err != nil { - return err - } - - if _, err = io.Copy(w, r); err != nil { - return err - } - - if err = os.Remove(filename); err != nil { - return err - } - return nil - }() + cluster := clusterEntry.Name() + hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster)) if err != nil { - return n, err + return totalFiles, err } - n += 1 + + // Collect rows from all hosts in this cluster using worker pool + type hostResult struct { + rows []ParquetMetricRow + files []string // checkpoint filenames to delete after successful write + dir string // checkpoint directory for this host + } + + results := make(chan hostResult, len(hostEntries)) + work := make(chan struct { + dir, host string + }, Keys.NumWorkers) + + var wg sync.WaitGroup + errs := int32(0) + + wg.Add(Keys.NumWorkers) + for w := 0; w < Keys.NumWorkers; w++ { + go func() { + defer wg.Done() + for item := range work { + rows, files, err := archiveCheckpointsToParquet(item.dir, cluster, item.host, from) + if err != nil { + cclog.Errorf("[METRICSTORE]> error reading checkpoints for %s/%s: %s", cluster, item.host, err.Error()) + atomic.AddInt32(&errs, 1) + continue + } + if len(rows) > 0 { + results <- hostResult{rows: rows, files: files, dir: item.dir} + } + } + }() + } + + go func() { + for _, hostEntry := range hostEntries { + if !hostEntry.IsDir() { + continue + } + dir := filepath.Join(checkpointsDir, cluster, hostEntry.Name()) + work <- struct { + dir, host string + }{dir: dir, host: hostEntry.Name()} + } + close(work) + wg.Wait() + close(results) + }() + + // Collect all rows and file info + var allRows []ParquetMetricRow + var allResults []hostResult + for r := range results { + allRows = append(allRows, r.rows...) + allResults = append(allResults, r) + } + + if errs > 0 { + return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) + } + + if len(allRows) == 0 { + continue + } + + // Write one Parquet file per cluster + parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) + if err := writeParquetArchive(parquetFile, allRows); err != nil { + return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, err) + } + + // Delete archived checkpoint files + for _, result := range allResults { + for _, file := range result.files { + filename := filepath.Join(result.dir, file) + if err := os.Remove(filename); err != nil { + cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err) + } else { + totalFiles++ + } + } + } + + cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s", + len(allRows), totalFiles, cluster, parquetFile) } - return n, nil + return totalFiles, nil } diff --git a/pkg/metricstore/parquetArchive.go b/pkg/metricstore/parquetArchive.go new file mode 100644 index 00000000..420ee4e5 --- /dev/null +++ b/pkg/metricstore/parquetArchive.go @@ -0,0 +1,213 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package metricstore + +import ( + "bufio" + "encoding/binary" + "encoding/json" + "fmt" + "os" + "path/filepath" + + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + pq "github.com/parquet-go/parquet-go" +) + +// ParquetMetricRow is the long-format schema for archived metric data. +// One row per (host, metric, scope, scope_id, timestamp) data point. +// Sorted by (cluster, hostname, metric, timestamp) for optimal compression. +type ParquetMetricRow struct { + Cluster string `parquet:"cluster"` + Hostname string `parquet:"hostname"` + Metric string `parquet:"metric"` + Scope string `parquet:"scope"` + ScopeID string `parquet:"scope_id"` + Timestamp int64 `parquet:"timestamp"` + Frequency int64 `parquet:"frequency"` + Value float32 `parquet:"value"` +} + +// flattenCheckpointFile recursively converts a CheckpointFile tree into Parquet rows. +// The scope path is built from the hierarchy: host level is "node", then child names +// map to scope/scope_id (e.g., "socket0" → scope="socket", scope_id="0"). +func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string, rows []ParquetMetricRow) []ParquetMetricRow { + for metricName, cm := range cf.Metrics { + ts := cm.Start + for _, v := range cm.Data { + if !v.IsNaN() { + rows = append(rows, ParquetMetricRow{ + Cluster: cluster, + Hostname: hostname, + Metric: metricName, + Scope: scope, + ScopeID: scopeID, + Timestamp: ts, + Frequency: cm.Frequency, + Value: float32(v), + }) + } + ts += cm.Frequency + } + } + + for childName, childCf := range cf.Children { + childScope, childScopeID := parseScopeFromName(childName) + rows = flattenCheckpointFile(childCf, cluster, hostname, childScope, childScopeID, rows) + } + + return rows +} + +// parseScopeFromName infers scope and scope_id from a child level name. +// Examples: "socket0" → ("socket", "0"), "core12" → ("core", "12"), +// "a0" (accelerator) → ("accelerator", "0"). +// If the name doesn't match known patterns, it's used as-is for scope with empty scope_id. +func parseScopeFromName(name string) (string, string) { + prefixes := []struct { + prefix string + scope string + }{ + {"socket", "socket"}, + {"memoryDomain", "memoryDomain"}, + {"core", "core"}, + {"hwthread", "hwthread"}, + {"cpu", "hwthread"}, + {"accelerator", "accelerator"}, + } + + for _, p := range prefixes { + if len(name) > len(p.prefix) && name[:len(p.prefix)] == p.prefix { + id := name[len(p.prefix):] + if len(id) > 0 && id[0] >= '0' && id[0] <= '9' { + return p.scope, id + } + } + } + + return name, "" +} + +// writeParquetArchive writes rows to a Parquet file with Zstd compression. +func writeParquetArchive(filename string, rows []ParquetMetricRow) error { + if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil { + return fmt.Errorf("creating archive directory: %w", err) + } + + f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) + if err != nil { + return fmt.Errorf("creating parquet file: %w", err) + } + defer f.Close() + + bw := bufio.NewWriterSize(f, 1<<20) // 1MB write buffer + + writer := pq.NewGenericWriter[ParquetMetricRow](bw, + pq.Compression(&pq.Zstd), + pq.SortingWriterConfig(pq.SortingColumns( + pq.Ascending("cluster"), + pq.Ascending("hostname"), + pq.Ascending("metric"), + pq.Ascending("timestamp"), + )), + ) + + if _, err := writer.Write(rows); err != nil { + return fmt.Errorf("writing parquet rows: %w", err) + } + + if err := writer.Close(); err != nil { + return fmt.Errorf("closing parquet writer: %w", err) + } + + if err := bw.Flush(); err != nil { + return fmt.Errorf("flushing parquet file: %w", err) + } + + return nil +} + +// loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns +// a CheckpointFile. Used by the Parquet archiver to read checkpoint data +// before converting it to Parquet format. +func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) { + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + + ext := filepath.Ext(filename) + switch ext { + case ".json": + cf := &CheckpointFile{} + br := bufio.NewReader(f) + if err := json.NewDecoder(br).Decode(cf); err != nil { + return nil, fmt.Errorf("decoding JSON checkpoint %s: %w", filename, err) + } + return cf, nil + + case ".bin": + br := bufio.NewReader(f) + var magic uint32 + if err := binary.Read(br, binary.LittleEndian, &magic); err != nil { + return nil, fmt.Errorf("reading magic from %s: %w", filename, err) + } + if magic != snapFileMagic { + return nil, fmt.Errorf("invalid snapshot magic in %s: 0x%08X", filename, magic) + } + var fileFrom, fileTo int64 + if err := binary.Read(br, binary.LittleEndian, &fileFrom); err != nil { + return nil, fmt.Errorf("reading from-timestamp from %s: %w", filename, err) + } + if err := binary.Read(br, binary.LittleEndian, &fileTo); err != nil { + return nil, fmt.Errorf("reading to-timestamp from %s: %w", filename, err) + } + cf, err := readBinaryLevel(br) + if err != nil { + return nil, fmt.Errorf("reading binary level from %s: %w", filename, err) + } + cf.From = fileFrom + cf.To = fileTo + return cf, nil + + default: + return nil, fmt.Errorf("unsupported checkpoint extension: %s", ext) + } +} + +// archiveCheckpointsToParquet reads checkpoint files for a host directory, +// converts them to Parquet rows. Returns the rows and filenames that were processed. +func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) { + entries, err := os.ReadDir(dir) + if err != nil { + return nil, nil, err + } + + files, err := findFiles(entries, from, false) + if err != nil { + return nil, nil, err + } + + if len(files) == 0 { + return nil, nil, nil + } + + var rows []ParquetMetricRow + + for _, checkpoint := range files { + filename := filepath.Join(dir, checkpoint) + cf, err := loadCheckpointFileFromDisk(filename) + if err != nil { + cclog.Warnf("[METRICSTORE]> skipping unreadable checkpoint %s: %v", filename, err) + continue + } + + rows = flattenCheckpointFile(cf, cluster, host, "node", "", rows) + } + + return rows, files, nil +} diff --git a/pkg/metricstore/parquetArchive_test.go b/pkg/metricstore/parquetArchive_test.go new file mode 100644 index 00000000..d3d70c02 --- /dev/null +++ b/pkg/metricstore/parquetArchive_test.go @@ -0,0 +1,255 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package metricstore + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/ClusterCockpit/cc-lib/v2/schema" + pq "github.com/parquet-go/parquet-go" +) + +func TestParseScopeFromName(t *testing.T) { + tests := []struct { + name string + wantScope string + wantID string + }{ + {"socket0", "socket", "0"}, + {"socket12", "socket", "12"}, + {"core0", "core", "0"}, + {"core127", "core", "127"}, + {"cpu0", "hwthread", "0"}, + {"hwthread5", "hwthread", "5"}, + {"memoryDomain0", "memoryDomain", "0"}, + {"accelerator0", "accelerator", "0"}, + {"unknown", "unknown", ""}, + {"socketX", "socketX", ""}, // not numeric suffix + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scope, id := parseScopeFromName(tt.name) + if scope != tt.wantScope || id != tt.wantID { + t.Errorf("parseScopeFromName(%q) = (%q, %q), want (%q, %q)", + tt.name, scope, id, tt.wantScope, tt.wantID) + } + }) + } +} + +func TestFlattenCheckpointFile(t *testing.T) { + cf := &CheckpointFile{ + From: 1000, + To: 1060, + Metrics: map[string]*CheckpointMetrics{ + "cpu_load": { + Frequency: 60, + Start: 1000, + Data: []schema.Float{0.5, 0.7, schema.NaN}, + }, + }, + Children: map[string]*CheckpointFile{ + "socket0": { + Metrics: map[string]*CheckpointMetrics{ + "mem_bw": { + Frequency: 60, + Start: 1000, + Data: []schema.Float{100.0, schema.NaN, 200.0}, + }, + }, + Children: make(map[string]*CheckpointFile), + }, + }, + } + + rows := flattenCheckpointFile(cf, "fritz", "node001", "node", "", nil) + + // cpu_load: 2 non-NaN values at node scope + // mem_bw: 2 non-NaN values at socket0 scope + if len(rows) != 4 { + t.Fatalf("expected 4 rows, got %d", len(rows)) + } + + // Verify a node-scope row + found := false + for _, r := range rows { + if r.Metric == "cpu_load" && r.Timestamp == 1000 { + found = true + if r.Cluster != "fritz" || r.Hostname != "node001" || r.Scope != "node" || r.Value != 0.5 { + t.Errorf("unexpected row: %+v", r) + } + } + } + if !found { + t.Error("expected cpu_load row at timestamp 1000") + } + + // Verify a socket-scope row + found = false + for _, r := range rows { + if r.Metric == "mem_bw" && r.Scope == "socket" && r.ScopeID == "0" { + found = true + } + } + if !found { + t.Error("expected mem_bw row with scope=socket, scope_id=0") + } +} + +func TestParquetArchiveRoundtrip(t *testing.T) { + tmpDir := t.TempDir() + + // Create checkpoint files on disk (JSON format) + cpDir := filepath.Join(tmpDir, "checkpoints", "testcluster", "node001") + if err := os.MkdirAll(cpDir, 0o755); err != nil { + t.Fatal(err) + } + + cf := &CheckpointFile{ + From: 1000, + To: 1180, + Metrics: map[string]*CheckpointMetrics{ + "cpu_load": { + Frequency: 60, + Start: 1000, + Data: []schema.Float{0.5, 0.7, 0.9}, + }, + "mem_used": { + Frequency: 60, + Start: 1000, + Data: []schema.Float{45.0, 46.0, 47.0}, + }, + }, + Children: map[string]*CheckpointFile{ + "socket0": { + Metrics: map[string]*CheckpointMetrics{ + "mem_bw": { + Frequency: 60, + Start: 1000, + Data: []schema.Float{100.0, 110.0, 120.0}, + }, + }, + Children: make(map[string]*CheckpointFile), + }, + }, + } + + // Write JSON checkpoint + cpFile := filepath.Join(cpDir, "1000.json") + data, err := json.Marshal(cf) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(cpFile, data, 0o644); err != nil { + t.Fatal(err) + } + + // Archive to Parquet + archiveDir := filepath.Join(tmpDir, "archive") + rows, files, err := archiveCheckpointsToParquet(cpDir, "testcluster", "node001", 2000) + if err != nil { + t.Fatal(err) + } + if len(files) != 1 || files[0] != "1000.json" { + t.Fatalf("expected 1 file, got %v", files) + } + + parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet") + if err := writeParquetArchive(parquetFile, rows); err != nil { + t.Fatal(err) + } + + // Read back and verify + f, err := os.Open(parquetFile) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + stat, _ := f.Stat() + pf, err := pq.OpenFile(f, stat.Size()) + if err != nil { + t.Fatal(err) + } + + reader := pq.NewGenericReader[ParquetMetricRow](pf) + readRows := make([]ParquetMetricRow, 100) + n, err := reader.Read(readRows) + if err != nil && n == 0 { + t.Fatal(err) + } + readRows = readRows[:n] + reader.Close() + + // We expect: cpu_load(3) + mem_used(3) + mem_bw(3) = 9 rows + if n != 9 { + t.Fatalf("expected 9 rows in parquet file, got %d", n) + } + + // Verify cluster and hostname are set correctly + for _, r := range readRows { + if r.Cluster != "testcluster" { + t.Errorf("expected cluster=testcluster, got %s", r.Cluster) + } + if r.Hostname != "node001" { + t.Errorf("expected hostname=node001, got %s", r.Hostname) + } + } + + // Verify parquet file is smaller than JSON (compression working) + if stat.Size() == 0 { + t.Error("parquet file is empty") + } + + t.Logf("Parquet file size: %d bytes for %d rows", stat.Size(), n) +} + +func TestLoadCheckpointFileFromDisk_JSON(t *testing.T) { + tmpDir := t.TempDir() + + cf := &CheckpointFile{ + From: 1000, + To: 1060, + Metrics: map[string]*CheckpointMetrics{ + "test_metric": { + Frequency: 60, + Start: 1000, + Data: []schema.Float{1.0, 2.0, 3.0}, + }, + }, + Children: make(map[string]*CheckpointFile), + } + + filename := filepath.Join(tmpDir, "1000.json") + data, err := json.Marshal(cf) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filename, data, 0o644); err != nil { + t.Fatal(err) + } + + loaded, err := loadCheckpointFileFromDisk(filename) + if err != nil { + t.Fatal(err) + } + + if loaded.From != 1000 || loaded.To != 1060 { + t.Errorf("expected From=1000, To=1060, got From=%d, To=%d", loaded.From, loaded.To) + } + + m, ok := loaded.Metrics["test_metric"] + if !ok { + t.Fatal("expected test_metric in loaded checkpoint") + } + if m.Frequency != 60 || m.Start != 1000 || len(m.Data) != 3 { + t.Errorf("unexpected metric data: %+v", m) + } +}