mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-21 07:17:30 +01:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0a4c4d8e57 | ||
| 032d1e0692 | |||
| 30af428d80 |
@@ -43,7 +43,6 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
// cleanUpWorker takes simple values to configure what it does
|
// cleanUpWorker takes simple values to configure what it does
|
||||||
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
|
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
|
|
||||||
d, err := time.ParseDuration(interval)
|
d, err := time.ParseDuration(interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err)
|
cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err)
|
||||||
@@ -99,8 +98,8 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type workItem struct {
|
type workItem struct {
|
||||||
dir string
|
dir string
|
||||||
cluster, host string
|
cluster, host string
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@@ -187,9 +186,8 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
return totalFiles, err
|
return totalFiles, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect rows from all hosts in this cluster using worker pool
|
// Collect files to delete from all hosts in this cluster using worker pool
|
||||||
type hostResult struct {
|
type hostResult struct {
|
||||||
rows []ParquetMetricRow
|
|
||||||
files []string // checkpoint filenames to delete after successful write
|
files []string // checkpoint filenames to delete after successful write
|
||||||
dir string // checkpoint directory for this host
|
dir string // checkpoint directory for this host
|
||||||
}
|
}
|
||||||
@@ -199,6 +197,8 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
dir, host string
|
dir, host string
|
||||||
}, Keys.NumWorkers)
|
}, Keys.NumWorkers)
|
||||||
|
|
||||||
|
rowChan := make(chan *ParquetMetricRow, 10000)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
errs := int32(0)
|
errs := int32(0)
|
||||||
|
|
||||||
@@ -207,19 +207,20 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for item := range work {
|
for item := range work {
|
||||||
rows, files, err := archiveCheckpointsToParquet(item.dir, cluster, item.host, from)
|
files, err := archiveCheckpointsToParquet(item.dir, cluster, item.host, from, rowChan)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("[METRICSTORE]> error reading checkpoints for %s/%s: %s", cluster, item.host, err.Error())
|
cclog.Errorf("[METRICSTORE]> error reading checkpoints for %s/%s: %s", cluster, item.host, err.Error())
|
||||||
atomic.AddInt32(&errs, 1)
|
atomic.AddInt32(&errs, 1)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if len(rows) > 0 {
|
if len(files) > 0 {
|
||||||
results <- hostResult{rows: rows, files: files, dir: item.dir}
|
results <- hostResult{files: files, dir: item.dir}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Produce work items
|
||||||
go func() {
|
go func() {
|
||||||
for _, hostEntry := range hostEntries {
|
for _, hostEntry := range hostEntries {
|
||||||
if !hostEntry.IsDir() {
|
if !hostEntry.IsDir() {
|
||||||
@@ -231,15 +232,22 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
}{dir: dir, host: hostEntry.Name()}
|
}{dir: dir, host: hostEntry.Name()}
|
||||||
}
|
}
|
||||||
close(work)
|
close(work)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for all workers and close rowChan and results
|
||||||
|
go func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
close(rowChan)
|
||||||
close(results)
|
close(results)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Collect all rows and file info
|
// Concurrently write from rowChan to Parquet
|
||||||
var allRows []ParquetMetricRow
|
parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from))
|
||||||
|
rowCount, writerErr := writeParquetArchiveStream(parquetFile, rowChan)
|
||||||
|
|
||||||
|
// Collect all file info
|
||||||
var allResults []hostResult
|
var allResults []hostResult
|
||||||
for r := range results {
|
for r := range results {
|
||||||
allRows = append(allRows, r.rows...)
|
|
||||||
allResults = append(allResults, r)
|
allResults = append(allResults, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -247,17 +255,18 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster)
|
return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(allRows) == 0 {
|
if writerErr != nil {
|
||||||
|
return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writerErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rowCount == 0 {
|
||||||
|
// Cleanup empty parquet file if created
|
||||||
|
os.Remove(parquetFile)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write one Parquet file per cluster
|
|
||||||
parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from))
|
|
||||||
if err := writeParquetArchive(parquetFile, allRows); err != nil {
|
|
||||||
return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete archived checkpoint files
|
// Delete archived checkpoint files
|
||||||
|
totalFilesCluster := 0
|
||||||
for _, result := range allResults {
|
for _, result := range allResults {
|
||||||
for _, file := range result.files {
|
for _, file := range result.files {
|
||||||
filename := filepath.Join(result.dir, file)
|
filename := filepath.Join(result.dir, file)
|
||||||
@@ -265,12 +274,13 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err)
|
cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err)
|
||||||
} else {
|
} else {
|
||||||
totalFiles++
|
totalFiles++
|
||||||
|
totalFilesCluster++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s",
|
cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s",
|
||||||
len(allRows), totalFiles, cluster, parquetFile)
|
rowCount, totalFilesCluster, cluster, parquetFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
return totalFiles, nil
|
return totalFiles, nil
|
||||||
|
|||||||
@@ -31,15 +31,15 @@ type ParquetMetricRow struct {
|
|||||||
Value float32 `parquet:"value"`
|
Value float32 `parquet:"value"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// flattenCheckpointFile recursively converts a CheckpointFile tree into Parquet rows.
|
// flattenCheckpointFile recursively converts a CheckpointFile tree into Parquet rows via a channel.
|
||||||
// The scope path is built from the hierarchy: host level is "node", then child names
|
// The scope path is built from the hierarchy: host level is "node", then child names
|
||||||
// map to scope/scope_id (e.g., "socket0" → scope="socket", scope_id="0").
|
// map to scope/scope_id (e.g., "socket0" → scope="socket", scope_id="0").
|
||||||
func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string, rows []ParquetMetricRow) []ParquetMetricRow {
|
func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string, rowChan chan<- *ParquetMetricRow) {
|
||||||
for metricName, cm := range cf.Metrics {
|
for metricName, cm := range cf.Metrics {
|
||||||
ts := cm.Start
|
ts := cm.Start
|
||||||
for _, v := range cm.Data {
|
for _, v := range cm.Data {
|
||||||
if !v.IsNaN() {
|
if !v.IsNaN() {
|
||||||
rows = append(rows, ParquetMetricRow{
|
rowChan <- &ParquetMetricRow{
|
||||||
Cluster: cluster,
|
Cluster: cluster,
|
||||||
Hostname: hostname,
|
Hostname: hostname,
|
||||||
Metric: metricName,
|
Metric: metricName,
|
||||||
@@ -48,7 +48,7 @@ func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID
|
|||||||
Timestamp: ts,
|
Timestamp: ts,
|
||||||
Frequency: cm.Frequency,
|
Frequency: cm.Frequency,
|
||||||
Value: float32(v),
|
Value: float32(v),
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
ts += cm.Frequency
|
ts += cm.Frequency
|
||||||
}
|
}
|
||||||
@@ -56,10 +56,8 @@ func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID
|
|||||||
|
|
||||||
for childName, childCf := range cf.Children {
|
for childName, childCf := range cf.Children {
|
||||||
childScope, childScopeID := parseScopeFromName(childName)
|
childScope, childScopeID := parseScopeFromName(childName)
|
||||||
rows = flattenCheckpointFile(childCf, cluster, hostname, childScope, childScopeID, rows)
|
flattenCheckpointFile(childCf, cluster, hostname, childScope, childScopeID, rowChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
return rows
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseScopeFromName infers scope and scope_id from a child level name.
|
// parseScopeFromName infers scope and scope_id from a child level name.
|
||||||
@@ -91,15 +89,23 @@ func parseScopeFromName(name string) (string, string) {
|
|||||||
return name, ""
|
return name, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeParquetArchive writes rows to a Parquet file with Zstd compression.
|
// writeParquetArchiveStream writes rows from a channel to a Parquet file with Zstd compression in batches.
|
||||||
func writeParquetArchive(filename string, rows []ParquetMetricRow) error {
|
func writeParquetArchiveStream(filename string, rowChan <-chan *ParquetMetricRow) (int, error) {
|
||||||
if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil {
|
if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil {
|
||||||
return fmt.Errorf("creating archive directory: %w", err)
|
go func() {
|
||||||
|
for range rowChan {
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return 0, fmt.Errorf("creating archive directory: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("creating parquet file: %w", err)
|
go func() {
|
||||||
|
for range rowChan {
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return 0, fmt.Errorf("creating parquet file: %w", err)
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
@@ -115,19 +121,45 @@ func writeParquetArchive(filename string, rows []ParquetMetricRow) error {
|
|||||||
)),
|
)),
|
||||||
)
|
)
|
||||||
|
|
||||||
if _, err := writer.Write(rows); err != nil {
|
batchSize := 4096
|
||||||
return fmt.Errorf("writing parquet rows: %w", err)
|
batch := make([]ParquetMetricRow, 0, batchSize)
|
||||||
|
rowCount := 0
|
||||||
|
var writeErr error
|
||||||
|
|
||||||
|
for rowPtr := range rowChan {
|
||||||
|
if writeErr != nil {
|
||||||
|
continue // Drain the channel to prevent worker deadlock
|
||||||
|
}
|
||||||
|
batch = append(batch, *rowPtr)
|
||||||
|
if len(batch) >= batchSize {
|
||||||
|
if _, err := writer.Write(batch); err != nil {
|
||||||
|
writeErr = fmt.Errorf("writing parquet batch: %w", err)
|
||||||
|
}
|
||||||
|
rowCount += len(batch)
|
||||||
|
batch = batch[:0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if writeErr != nil {
|
||||||
|
return rowCount, writeErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(batch) > 0 {
|
||||||
|
if _, err := writer.Write(batch); err != nil {
|
||||||
|
return rowCount, fmt.Errorf("writing remaining parquet batch: %w", err)
|
||||||
|
}
|
||||||
|
rowCount += len(batch)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := writer.Close(); err != nil {
|
if err := writer.Close(); err != nil {
|
||||||
return fmt.Errorf("closing parquet writer: %w", err)
|
return rowCount, fmt.Errorf("closing parquet writer: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := bw.Flush(); err != nil {
|
if err := bw.Flush(); err != nil {
|
||||||
return fmt.Errorf("flushing parquet file: %w", err)
|
return rowCount, fmt.Errorf("flushing parquet file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return rowCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns
|
// loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns
|
||||||
@@ -180,24 +212,22 @@ func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// archiveCheckpointsToParquet reads checkpoint files for a host directory,
|
// archiveCheckpointsToParquet reads checkpoint files for a host directory,
|
||||||
// converts them to Parquet rows. Returns the rows and filenames that were processed.
|
// converts them to Parquet rows. Returns the filenames that were processed.
|
||||||
func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) {
|
func archiveCheckpointsToParquet(dir, cluster, host string, from int64, rowChan chan<- *ParquetMetricRow) ([]string, error) {
|
||||||
entries, err := os.ReadDir(dir)
|
entries, err := os.ReadDir(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
files, err := findFiles(entries, from, false)
|
files, err := findFiles(entries, from, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(files) == 0 {
|
if len(files) == 0 {
|
||||||
return nil, nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var rows []ParquetMetricRow
|
|
||||||
|
|
||||||
for _, checkpoint := range files {
|
for _, checkpoint := range files {
|
||||||
filename := filepath.Join(dir, checkpoint)
|
filename := filepath.Join(dir, checkpoint)
|
||||||
cf, err := loadCheckpointFileFromDisk(filename)
|
cf, err := loadCheckpointFileFromDisk(filename)
|
||||||
@@ -206,8 +236,8 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
rows = flattenCheckpointFile(cf, cluster, host, "node", "", rows)
|
flattenCheckpointFile(cf, cluster, host, "node", "", rowChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
return rows, files, nil
|
return files, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -69,7 +69,16 @@ func TestFlattenCheckpointFile(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
rows := flattenCheckpointFile(cf, "fritz", "node001", "node", "", nil)
|
rowChan := make(chan *ParquetMetricRow, 100)
|
||||||
|
go func() {
|
||||||
|
flattenCheckpointFile(cf, "fritz", "node001", "node", "", rowChan)
|
||||||
|
close(rowChan)
|
||||||
|
}()
|
||||||
|
|
||||||
|
var rows []ParquetMetricRow
|
||||||
|
for r := range rowChan {
|
||||||
|
rows = append(rows, *r)
|
||||||
|
}
|
||||||
|
|
||||||
// cpu_load: 2 non-NaN values at node scope
|
// cpu_load: 2 non-NaN values at node scope
|
||||||
// mem_bw: 2 non-NaN values at socket0 scope
|
// mem_bw: 2 non-NaN values at socket0 scope
|
||||||
@@ -153,17 +162,28 @@ func TestParquetArchiveRoundtrip(t *testing.T) {
|
|||||||
|
|
||||||
// Archive to Parquet
|
// Archive to Parquet
|
||||||
archiveDir := filepath.Join(tmpDir, "archive")
|
archiveDir := filepath.Join(tmpDir, "archive")
|
||||||
rows, files, err := archiveCheckpointsToParquet(cpDir, "testcluster", "node001", 2000)
|
rowChan := make(chan *ParquetMetricRow, 100)
|
||||||
|
|
||||||
|
var files []string
|
||||||
|
var archiveErr error
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
files, archiveErr = archiveCheckpointsToParquet(cpDir, "testcluster", "node001", 2000, rowChan)
|
||||||
|
close(rowChan)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet")
|
||||||
|
_, err = writeParquetArchiveStream(parquetFile, rowChan)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if len(files) != 1 || files[0] != "1000.json" {
|
<-done
|
||||||
t.Fatalf("expected 1 file, got %v", files)
|
if archiveErr != nil {
|
||||||
|
t.Fatal(archiveErr)
|
||||||
}
|
}
|
||||||
|
if len(files) != 1 || len(files) > 0 && files[0] != "1000.json" {
|
||||||
parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet")
|
t.Fatalf("expected 1 file (1000.json), got %v", files)
|
||||||
if err := writeParquetArchive(parquetFile, rows); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read back and verify
|
// Read back and verify
|
||||||
|
|||||||
Reference in New Issue
Block a user