667 lines
28 KiB
Python
667 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import gzip
|
|
import json
|
|
import argparse
|
|
from pathlib import Path
|
|
import matplotlib
|
|
matplotlib.use('Agg') # Use non-interactive backend
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
from datetime import datetime
|
|
from multiprocessing import Pool, cpu_count
|
|
import functools
|
|
|
|
def load_job_data_from_folder(job_path):
|
|
"""Load job data from folder"""
|
|
job_path = Path(job_path)
|
|
|
|
# Load data.json.gz
|
|
data_file_path = job_path / "data.json.gz"
|
|
if not data_file_path.exists():
|
|
raise FileNotFoundError(f"data.json.gz not found in {job_path}")
|
|
|
|
with gzip.open(data_file_path, 'rt') as f:
|
|
data = json.load(f)
|
|
|
|
# Load meta.json
|
|
meta_file_path = job_path / "meta.json"
|
|
if not meta_file_path.exists():
|
|
raise FileNotFoundError(f"meta.json not found in {job_path}")
|
|
|
|
with open(meta_file_path, 'r') as f:
|
|
meta = json.load(f)
|
|
|
|
return data, meta
|
|
|
|
def load_cluster_config_from_folder(base_path, cluster_path="cluster.json"):
|
|
"""Load cluster configuration data from folder"""
|
|
config_path = Path(base_path) / cluster_path
|
|
if not config_path.exists():
|
|
raise FileNotFoundError(f"cluster.json not found at {config_path}")
|
|
|
|
with open(config_path, 'r') as f:
|
|
return json.load(f)
|
|
|
|
def find_job_directories(base_path):
|
|
"""Find all job directories in the folder structure"""
|
|
base_path = Path(base_path)
|
|
job_dirs = []
|
|
|
|
# Look for directories matching pattern: {prefix}/{suffix}/{timestamp}
|
|
for prefix_dir in base_path.iterdir():
|
|
if not prefix_dir.is_dir():
|
|
continue
|
|
for suffix_dir in prefix_dir.iterdir():
|
|
if not suffix_dir.is_dir():
|
|
continue
|
|
for timestamp_dir in suffix_dir.iterdir():
|
|
if not timestamp_dir.is_dir():
|
|
continue
|
|
# Check if this directory contains both meta.json and data.json.gz
|
|
if (timestamp_dir / "meta.json").exists() and (timestamp_dir / "data.json.gz").exists():
|
|
job_dirs.append(timestamp_dir)
|
|
|
|
return job_dirs
|
|
|
|
def load_done_list(done_file_path):
|
|
"""Load list of already processed job prefixes from done.txt"""
|
|
done_file = Path(done_file_path)
|
|
if not done_file.exists():
|
|
return set()
|
|
|
|
with open(done_file, 'r') as f:
|
|
return set(line.strip() for line in f if line.strip())
|
|
|
|
def save_done_list(done_file_path, done_set):
|
|
"""Save list of processed job prefixes to done.txt"""
|
|
with open(done_file_path, 'w') as f:
|
|
for prefix in sorted(done_set):
|
|
f.write(f"{prefix}\n")
|
|
|
|
def navigate_nested_dict(d, path):
|
|
"""Navigate a nested dictionary using a dot-separated path with array indices"""
|
|
if not path:
|
|
return d
|
|
|
|
current = d
|
|
parts = path.split('.')
|
|
|
|
for part in parts:
|
|
if '[' in part and ']' in part:
|
|
# Handle explicit array access syntax (e.g., "node[0]")
|
|
array_name = part.split('[')[0]
|
|
index = int(part.split('[')[1].split(']')[0])
|
|
current = current[array_name][index]
|
|
elif part.isdigit():
|
|
# Handle numeric indices for list access
|
|
current = current[int(part)]
|
|
else:
|
|
# Handle regular dict access
|
|
current = current[part]
|
|
|
|
return current
|
|
|
|
def get_subcluster_config(cluster_config, subcluster_name):
|
|
"""Get the configuration for a specific subcluster by name"""
|
|
subclusters = cluster_config.get("subClusters", [])
|
|
|
|
for i, subcluster in enumerate(subclusters):
|
|
if subcluster.get("name") == subcluster_name:
|
|
return i
|
|
|
|
return 0 # Default to first subcluster if not found
|
|
|
|
def get_performance_value(cluster_config, subcluster_idx, metric_path):
|
|
"""Extract a performance value with its proper unit prefix"""
|
|
# Build the full path
|
|
full_path = f"subClusters.{subcluster_idx}.{metric_path}"
|
|
|
|
# Extract values
|
|
value_parts = full_path.split('.')
|
|
|
|
# Direct dictionary traversal instead of using navigate_nested_dict
|
|
current = cluster_config
|
|
for part in value_parts:
|
|
if part.isdigit():
|
|
current = current[int(part)]
|
|
else:
|
|
current = current[part]
|
|
|
|
value = current.get("value", 0)
|
|
|
|
# Get prefix if available
|
|
prefix = ""
|
|
if "unit" in current and "prefix" in current["unit"]:
|
|
prefix = current["unit"]["prefix"]
|
|
|
|
# Apply prefix multiplier
|
|
prefix_multipliers = {
|
|
'K': 1e3,
|
|
'M': 1e6,
|
|
'G': 1e9,
|
|
'T': 1e12,
|
|
'P': 1e15,
|
|
'': 1.0
|
|
}
|
|
|
|
multiplier = prefix_multipliers.get(prefix, 1.0)
|
|
return value * multiplier
|
|
|
|
def create_roofline_plot(data, meta, cluster_config, output_path=None, colorful=True, show_labels=True, size='normal'):
|
|
"""Create roofline plot with fixed axes for CNN training consistency"""
|
|
# Get number of nodes and subcluster name
|
|
num_nodes = meta['numNodes']
|
|
subcluster_name = meta.get('subCluster', 'main') # Default to 'main' if not specified
|
|
|
|
# Get subcluster index from the cluster configuration
|
|
subcluster_idx = get_subcluster_config(cluster_config, subcluster_name)
|
|
|
|
# Get performance ceilings from cluster configuration
|
|
scalar_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateScalar")
|
|
simd_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateSimd")
|
|
mem_bw = get_performance_value(cluster_config, subcluster_idx, "memoryBandwidth")
|
|
|
|
print(f"=== Roofline Plot Configuration ===")
|
|
print(f"Subcluster: {subcluster_name}")
|
|
print(f"SIMD Peak Performance: {simd_flops/1e9:.2f} GFLOP/s")
|
|
print(f"Scalar Peak Performance: {scalar_flops/1e9:.2f} GFLOP/s")
|
|
print(f"Memory Bandwidth: {mem_bw/1e9:.2f} GB/s")
|
|
print(f"Number of Nodes: {num_nodes}")
|
|
|
|
# Configure plot size based on size parameter
|
|
if size == 'small':
|
|
figsize = (3, 3)
|
|
dpi = 100
|
|
fontsize_labels = 10
|
|
fontsize_title = 12
|
|
fontsize_legend = 8
|
|
fontsize_annotations = 8
|
|
elif size == 'medium':
|
|
figsize = (6, 6)
|
|
dpi = 100
|
|
fontsize_labels = 12
|
|
fontsize_title = 14
|
|
fontsize_legend = 10
|
|
fontsize_annotations = 10
|
|
else: # normal
|
|
figsize = (8, 6)
|
|
dpi = 100
|
|
fontsize_labels = 12
|
|
fontsize_title = 14
|
|
fontsize_legend = 10
|
|
fontsize_annotations = 10
|
|
|
|
# Fixed plot configuration for CNN training consistency
|
|
fig = plt.figure(figsize=figsize, dpi=dpi, frameon=False)
|
|
ax = fig.add_subplot(1, 1, 1)
|
|
|
|
# Fixed axis ranges for CNN training consistency
|
|
x_min, x_max = 0.01, 1000.0 # FLOP/byte
|
|
y_min, y_max = 1.0, 10000.0 # GFLOP/s
|
|
|
|
ax.set_xlim(x_min, x_max)
|
|
ax.set_ylim(y_min, y_max)
|
|
ax.set_xscale('log')
|
|
ax.set_yscale('log')
|
|
|
|
# Add axis labels based on show_labels parameter
|
|
if show_labels:
|
|
ax.set_xlabel('Arithmetic Intensity [FLOP/byte]', fontsize=fontsize_labels)
|
|
ax.set_ylabel('Performance [GFLOPS]', fontsize=fontsize_labels)
|
|
else:
|
|
ax.set_xlabel('')
|
|
ax.set_ylabel('')
|
|
|
|
# Set consistent tick marks with minor ticks for log scale
|
|
ax.set_xticks([0.01, 0.1, 1, 10, 100, 1000])
|
|
ax.set_yticks([1, 10, 100, 1000, 10000])
|
|
|
|
# Add minor ticks for log scale
|
|
ax.set_xticks([0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09,
|
|
0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9,
|
|
2, 3, 4, 5, 6, 7, 8, 9,
|
|
20, 30, 40, 50, 60, 70, 80, 90,
|
|
200, 300, 400, 500, 600, 700, 800, 900], minor=True)
|
|
ax.set_yticks([2, 3, 4, 5, 6, 7, 8, 9,
|
|
20, 30, 40, 50, 60, 70, 80, 90,
|
|
200, 300, 400, 500, 600, 700, 800, 900,
|
|
2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000], minor=True)
|
|
|
|
# Light colored grid
|
|
ax.grid(True, which='major', alpha=0.4, linestyle='-', linewidth=0.8, color='gray')
|
|
ax.grid(True, which='minor', alpha=0.2, linestyle='-', linewidth=0.5, color='lightgray')
|
|
|
|
# Set title and legend font sizes
|
|
fontsize_title = 14 if size == 'normal' else 12
|
|
|
|
# Calculate knee points for rooflines
|
|
# Convert GB/s to Bytes/s for calculation
|
|
mem_bw_bytes = mem_bw # Already in bytes/s from get_performance_value
|
|
scalar_knee_intensity = scalar_flops / mem_bw_bytes # FLOP/byte
|
|
simd_knee_intensity = simd_flops / mem_bw_bytes # FLOP/byte
|
|
|
|
# Convert to GFLOP/s for plotting
|
|
scalar_peak_gflops = scalar_flops / 1e9
|
|
simd_peak_gflops = simd_flops / 1e9
|
|
|
|
print(f"Scalar knee intensity: {scalar_knee_intensity:.4f} FLOP/byte")
|
|
print(f"SIMD knee intensity: {simd_knee_intensity:.4f} FLOP/byte")
|
|
|
|
# Create intensity range for roofline plotting
|
|
intensity_range = np.logspace(np.log10(x_min), np.log10(x_max), 1000)
|
|
|
|
# Define colors based on colorful flag
|
|
if colorful:
|
|
memory_color = 'black'
|
|
scalar_color = 'red'
|
|
simd_color = 'blue'
|
|
data_color = 'orange'
|
|
else:
|
|
memory_color = 'black'
|
|
scalar_color = 'darkgray'
|
|
simd_color = 'gray'
|
|
data_color = 'black'
|
|
|
|
# Memory bandwidth roof (diagonal line)
|
|
memory_roof = intensity_range * (mem_bw_bytes / 1e9) # mem_bw_bytes is in bytes/s, convert to GB/s for calculation
|
|
memory_roof = np.clip(memory_roof, y_min, y_max)
|
|
|
|
# Plot memory bandwidth roof up to the vector knee point
|
|
memory_mask = intensity_range <= min(simd_knee_intensity, x_max)
|
|
ax.plot(intensity_range[memory_mask], memory_roof[memory_mask],
|
|
color=memory_color, linewidth=2, linestyle='-',
|
|
label=f'Memory BW Roof ({mem_bw_bytes/1e9:.1f} GB/s)')
|
|
|
|
# Scalar compute roof (horizontal line)
|
|
if scalar_knee_intensity <= x_max and scalar_peak_gflops <= y_max:
|
|
scalar_x_range = intensity_range[intensity_range >= scalar_knee_intensity]
|
|
scalar_y_values = np.full_like(scalar_x_range, scalar_peak_gflops)
|
|
ax.plot(scalar_x_range, scalar_y_values,
|
|
color=scalar_color, linewidth=2, linestyle='-',
|
|
label=f'Scalar Roof ({scalar_peak_gflops:.1f} GFLOPS)')
|
|
|
|
# SIMD compute roof (horizontal line)
|
|
if simd_knee_intensity <= x_max and simd_peak_gflops <= y_max:
|
|
simd_x_range = intensity_range[intensity_range >= simd_knee_intensity]
|
|
simd_y_values = np.full_like(simd_x_range, simd_peak_gflops)
|
|
ax.plot(simd_x_range, simd_y_values,
|
|
color=simd_color, linewidth=2, linestyle='-',
|
|
label=f'SIMD Roof ({simd_peak_gflops:.1f} GFLOPS)')
|
|
|
|
print(f"\n=== Data Points ===")
|
|
print(f"{'Node':4} {'Timestep':8} {'Bandwidth (raw)':15} {'FLOPS (raw)':15} {'AI (FLOP/byte)':15}")
|
|
print("-" * 75)
|
|
|
|
# Process job data points
|
|
has_valid_data = False
|
|
all_intensities = []
|
|
all_performances = []
|
|
all_timestamps = [] # Track timestamps for coloring
|
|
all_nodes = [] # Track node numbers for coloring
|
|
|
|
# Statistics for data filtering
|
|
total_data_points = 0
|
|
filtered_invalid = 0
|
|
filtered_above_roof = 0
|
|
filtered_out_of_bounds = 0
|
|
|
|
for node_num in range(num_nodes):
|
|
try:
|
|
# Extract data series
|
|
mem_bw_series = data.get("mem_bw", {}).get("node", {}).get("series", [])
|
|
flops_series = data.get("flops_any", {}).get("node", {}).get("series", [])
|
|
|
|
if node_num >= len(mem_bw_series) or node_num >= len(flops_series):
|
|
continue
|
|
|
|
mem_bw_data = mem_bw_series[node_num].get("data", [])
|
|
flops_data = flops_series[node_num].get("data", [])
|
|
|
|
# Ensure we have the same number of time steps for both metrics
|
|
num_timesteps = min(len(mem_bw_data), len(flops_data))
|
|
if num_timesteps == 0:
|
|
continue
|
|
|
|
total_data_points += num_timesteps
|
|
|
|
# Process each time step
|
|
for t_idx in range(num_timesteps):
|
|
# Get data values (assuming raw data is already in appropriate units)
|
|
bw_raw = mem_bw_data[t_idx] # Memory bandwidth (raw value)
|
|
flops_raw = flops_data[t_idx] # Performance in FLOP/s (raw value)
|
|
|
|
# Skip if we have invalid data (None, NaN, or non-positive values)
|
|
if (bw_raw is None or flops_raw is None or
|
|
(isinstance(bw_raw, (int, float)) and (np.isnan(bw_raw) or np.isinf(bw_raw))) or
|
|
(isinstance(flops_raw, (int, float)) and (np.isnan(flops_raw) or np.isinf(flops_raw))) or
|
|
bw_raw <= 0 or flops_raw <= 0):
|
|
filtered_invalid += 1
|
|
continue
|
|
|
|
# For arithmetic intensity calculation, we need consistent units
|
|
# Check if we need to adjust the FLOPS scaling
|
|
# If bandwidth is ~200 GB/s and we want AI around 5 FLOP/byte,
|
|
# then FLOPS should be around 1000 * 1e9 = 1e12 FLOP/s
|
|
|
|
# Try different FLOPS scalings to see what makes sense
|
|
flops_gflops = flops_raw # Assume raw is already in GFLOP/s
|
|
bw_gb_per_sec = bw_raw # Assume raw is in GB/s
|
|
|
|
# Calculate AI in FLOP/byte: (GFLOP/s) / (GB/s) = FLOP/byte
|
|
arith_intensity = flops_gflops / bw_gb_per_sec
|
|
|
|
# Additional validation: check for reasonable ranges
|
|
# Skip if arithmetic intensity is too extreme (likely measurement error)
|
|
if arith_intensity < 1e-6 or arith_intensity > 1e6:
|
|
filtered_invalid += 1
|
|
continue
|
|
|
|
# Use raw FLOPS value for plotting (assume it's already in GFLOP/s)
|
|
performance_for_plot = flops_raw
|
|
|
|
# Check if performance exceeds the theoretical maximum (above roofline)
|
|
# Use the higher of SIMD or scalar peak performance as the ceiling
|
|
max_theoretical_performance = max(simd_peak_gflops, scalar_peak_gflops)
|
|
|
|
# Also check against memory bandwidth ceiling at this intensity
|
|
mem_bw_ceiling = arith_intensity * (mem_bw_bytes / 1e9) # Convert to GFLOP/s
|
|
effective_ceiling = min(max_theoretical_performance, mem_bw_ceiling)
|
|
|
|
# Skip unrealistic data points that are significantly above the roofline
|
|
# Allow for some measurement tolerance (e.g., 5% above theoretical max)
|
|
if performance_for_plot > effective_ceiling * 1.05:
|
|
filtered_above_roof += 1
|
|
continue
|
|
|
|
# Print key data points for monitoring
|
|
if t_idx % 10 == 0: # Print every 10th point to avoid spam
|
|
print(f"{node_num:4d} {t_idx:8d} {bw_raw:15.2f} {performance_for_plot:15.2f} {arith_intensity:15.4f}")
|
|
|
|
# Check if points are within our fixed bounds
|
|
if (x_min <= arith_intensity <= x_max and
|
|
y_min <= performance_for_plot <= y_max):
|
|
|
|
all_intensities.append(arith_intensity)
|
|
all_performances.append(performance_for_plot)
|
|
all_timestamps.append(t_idx) # Store timestamp for coloring
|
|
all_nodes.append(node_num) # Store node number for coloring
|
|
has_valid_data = True
|
|
else:
|
|
filtered_out_of_bounds += 1
|
|
|
|
except (KeyError, IndexError, ValueError) as e:
|
|
print(f"Warning: Error processing data for node {node_num}: {e}")
|
|
continue
|
|
|
|
# Plot all data points with timestamp-based coloring
|
|
if has_valid_data:
|
|
if colorful:
|
|
# Define color palette for different nodes
|
|
node_colors = ['red', 'blue', 'green', 'orange', 'purple', 'brown', 'pink', 'gray', 'olive', 'cyan']
|
|
|
|
# Group data by node for plotting
|
|
node_data = {}
|
|
for i, (intensity, performance, timestamp, node_num) in enumerate(zip(all_intensities, all_performances, all_timestamps, all_nodes)):
|
|
if node_num not in node_data:
|
|
node_data[node_num] = {'intensities': [], 'performances': [], 'timestamps': []}
|
|
node_data[node_num]['intensities'].append(intensity)
|
|
node_data[node_num]['performances'].append(performance)
|
|
node_data[node_num]['timestamps'].append(timestamp)
|
|
|
|
# Plot each node with its own color, but with timestamp-based alpha
|
|
for node_num in sorted(node_data.keys()):
|
|
node_intensities = node_data[node_num]['intensities']
|
|
node_performances = node_data[node_num]['performances']
|
|
node_timestamps = node_data[node_num]['timestamps']
|
|
|
|
# Get base color for this node
|
|
base_color = node_colors[node_num % len(node_colors)]
|
|
|
|
# Calculate alpha values based on timestamp progression within this node
|
|
if len(node_timestamps) > 1:
|
|
min_time = min(node_timestamps)
|
|
max_time = max(node_timestamps)
|
|
alphas = []
|
|
for t in node_timestamps:
|
|
# Normalize timestamp to 0-1 range
|
|
normalized_time = (t - min_time) / (max_time - min_time) if max_time > min_time else 0
|
|
# Map to alpha: 0.3 (early) to 0.9 (late)
|
|
alpha = 0.3 + 0.6 * normalized_time
|
|
alphas.append(alpha)
|
|
else:
|
|
# Single timestamp or no variation
|
|
alphas = [0.7] * len(node_timestamps)
|
|
|
|
# Plot points for this node
|
|
for intensity, performance, alpha in zip(node_intensities, node_performances, alphas):
|
|
ax.scatter(intensity, performance, s=20, alpha=alpha, c=base_color, edgecolors='none')
|
|
else:
|
|
# Grayscale mode: use timestamp-based grayscale coloring across all nodes
|
|
if len(all_timestamps) > 1:
|
|
min_time = min(all_timestamps)
|
|
max_time = max(all_timestamps)
|
|
|
|
colors = []
|
|
alphas = []
|
|
for t in all_timestamps:
|
|
# Normalize timestamp to 0-1 range
|
|
normalized_time = (t - min_time) / (max_time - min_time) if max_time > min_time else 0
|
|
# Map to grayscale: 0.0 (black) to 0.7 (light gray)
|
|
gray_value = 0.7 * normalized_time
|
|
colors.append(str(gray_value))
|
|
# Map to alpha: 0.3 (early) to 0.9 (late)
|
|
alpha = 0.3 + 0.6 * normalized_time
|
|
alphas.append(alpha)
|
|
else:
|
|
# Single timestamp or no variation
|
|
colors = ['black'] * len(all_timestamps)
|
|
alphas = [0.7] * len(all_timestamps)
|
|
|
|
# Plot all points with timestamp-based coloring and alpha
|
|
for intensity, performance, color, alpha in zip(all_intensities, all_performances, colors, alphas):
|
|
ax.scatter(intensity, performance, s=20, alpha=alpha, c=color, edgecolors='none')
|
|
|
|
# Print filtering statistics
|
|
valid_points = len(all_intensities)
|
|
print(f"\n=== Data Filtering Statistics ===")
|
|
print(f"Total data points processed: {total_data_points}")
|
|
print(f"Filtered out - Invalid/None/NaN: {filtered_invalid}")
|
|
print(f"Filtered out - Above roofline: {filtered_above_roof}")
|
|
print(f"Filtered out - Out of bounds: {filtered_out_of_bounds}")
|
|
print(f"Valid data points plotted: {valid_points}")
|
|
print(f"Data retention rate: {(valid_points/total_data_points)*100:.1f}%" if total_data_points > 0 else "No data points")
|
|
|
|
print(f"\nPlotted {len(all_intensities)} valid data points with {'node-based coloring' if colorful else 'timestamp-based coloring'}")
|
|
else:
|
|
print(f"\n=== Data Filtering Statistics ===")
|
|
print(f"Total data points processed: {total_data_points}")
|
|
print(f"Filtered out - Invalid/None/NaN: {filtered_invalid}")
|
|
print(f"Filtered out - Above roofline: {filtered_above_roof}")
|
|
print(f"Filtered out - Out of bounds: {filtered_out_of_bounds}")
|
|
print(f"Valid data points plotted: 0")
|
|
print("\nNo valid data points found within the plot bounds - skipping plot generation")
|
|
plt.close(fig)
|
|
return False # Return False to indicate no plot was generated
|
|
|
|
# Add title and legend
|
|
if show_labels :
|
|
job_id = meta.get('jobId', 'unknown')
|
|
ax.set_title(f'Roofline Plot - Job {job_id} ({subcluster_name})', fontsize=fontsize_title)
|
|
ax.legend(loc='lower right', fontsize=fontsize_legend)
|
|
|
|
# Save or display
|
|
if output_path:
|
|
plt.tight_layout()
|
|
# Adjust DPI based on size for specific pixel dimensions
|
|
if size == 'small':
|
|
save_dpi = 100 # 300x300 pixels at 3x3 inches
|
|
elif size == 'medium':
|
|
save_dpi = 100 # 600x600 pixels at 6x6 inches
|
|
else:
|
|
save_dpi = 300 # High quality for normal size
|
|
|
|
plt.savefig(output_path, format='png', dpi=save_dpi, bbox_inches='tight', facecolor='white')
|
|
plt.close(fig)
|
|
print(f"\nSaved roofline plot to: {output_path}")
|
|
return True # Return True to indicate successful plot generation
|
|
else:
|
|
plt.tight_layout()
|
|
plt.show()
|
|
print("\nDisplayed roofline plot")
|
|
return True # Return True to indicate successful plot generation
|
|
|
|
def process_single_job(args):
|
|
"""Process a single job - designed for multiprocessing"""
|
|
job_path, cluster_config, output_base_dir = args
|
|
|
|
try:
|
|
# Parse job directory structure: {prefix}/{suffix}/{timestamp}
|
|
parts = job_path.parts
|
|
if len(parts) < 3:
|
|
return False, f"Invalid job path structure: {job_path}"
|
|
|
|
job_id_prefix = parts[-3]
|
|
job_id_suffix = parts[-2]
|
|
timestamp = parts[-1]
|
|
job_id = job_id_prefix + job_id_suffix
|
|
|
|
# Load job data
|
|
data, meta = load_job_data_from_folder(job_path)
|
|
|
|
# Check job duration - skip jobs less than 60 seconds
|
|
duration = meta.get('duration', 0)
|
|
if duration < 60:
|
|
return False, f"Skipped job {job_id}_{timestamp}: duration {duration}s < 60s"
|
|
|
|
# Create output directory for this job prefix
|
|
output_dir = Path(output_base_dir) / job_id_prefix
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate both colorful and grayscale plots
|
|
base_filename = f"{job_id}_{timestamp}"
|
|
|
|
# Colorful plot
|
|
colorful_path = output_dir / f"{base_filename}_color.png"
|
|
colorful_success = create_roofline_plot(data, meta, cluster_config,
|
|
output_path=str(colorful_path),
|
|
colorful=True, show_labels=False, size='medium')
|
|
|
|
# Grayscale plot
|
|
grayscale_path = output_dir / f"{base_filename}_grayscale.png"
|
|
grayscale_success = create_roofline_plot(data, meta, cluster_config,
|
|
output_path=str(grayscale_path),
|
|
colorful=False, show_labels=False, size='medium')
|
|
|
|
# Check if at least one plot was generated
|
|
if colorful_success or grayscale_success:
|
|
return True, f"Processed job {job_id}_{timestamp} (duration: {duration}s)"
|
|
else:
|
|
return False, f"Skipped job {job_id}_{timestamp}: no valid data to plot"
|
|
|
|
except Exception as e:
|
|
return False, f"Error processing job {job_path}: {e}"
|
|
|
|
def process_all_jobs(base_path, output_base_dir, done_file_path="done.txt", num_processes=None):
|
|
"""Process all jobs in the folder structure using multiprocessing"""
|
|
print(f"Processing jobs from: {base_path}")
|
|
print(f"Output directory: {output_base_dir}")
|
|
|
|
# Load cluster configuration
|
|
try:
|
|
cluster_config = load_cluster_config_from_folder(base_path)
|
|
print("Loaded cluster configuration")
|
|
except FileNotFoundError as e:
|
|
print(f"Error loading cluster config: {e}")
|
|
return
|
|
|
|
# Load already processed jobs
|
|
done_set = load_done_list(done_file_path)
|
|
print(f"Loaded {len(done_set)} already processed job prefixes from {done_file_path}")
|
|
|
|
# Find all job directories
|
|
job_dirs = find_job_directories(base_path)
|
|
print(f"Found {len(job_dirs)} total job directories")
|
|
|
|
# Filter out already processed jobs
|
|
filtered_job_dirs = []
|
|
for job_path in job_dirs:
|
|
parts = job_path.parts
|
|
if len(parts) >= 3:
|
|
job_id_prefix = parts[-3]
|
|
if job_id_prefix not in done_set:
|
|
filtered_job_dirs.append(job_path)
|
|
|
|
print(f"Found {len(filtered_job_dirs)} unprocessed job directories")
|
|
|
|
if not filtered_job_dirs:
|
|
print("No new jobs to process")
|
|
return
|
|
|
|
# Determine number of processes
|
|
if num_processes is None:
|
|
num_processes = min(12, cpu_count()) # Use up to 12 cores or available cores
|
|
|
|
print(f"Using {num_processes} processes for parallel processing")
|
|
|
|
# Prepare arguments for multiprocessing
|
|
process_args = [(job_path, cluster_config, output_base_dir) for job_path in filtered_job_dirs]
|
|
|
|
processed_count = 0
|
|
skipped_count = 0
|
|
processed_prefixes = set()
|
|
|
|
# Process jobs in parallel
|
|
if num_processes == 1:
|
|
# Single-threaded processing for debugging
|
|
results = [process_single_job(args) for args in process_args]
|
|
else:
|
|
# Multi-threaded processing
|
|
with Pool(num_processes) as pool:
|
|
results = pool.map(process_single_job, process_args)
|
|
|
|
# Collect results
|
|
for i, (success, message) in enumerate(results):
|
|
print(message)
|
|
if success:
|
|
processed_count += 1
|
|
# Extract job prefix for done tracking
|
|
job_path = filtered_job_dirs[i]
|
|
parts = job_path.parts
|
|
if len(parts) >= 3:
|
|
job_id_prefix = parts[-3]
|
|
processed_prefixes.add(job_id_prefix)
|
|
else:
|
|
skipped_count += 1
|
|
|
|
# Update done list with newly processed prefixes
|
|
if processed_prefixes:
|
|
updated_done_set = done_set | processed_prefixes
|
|
save_done_list(done_file_path, updated_done_set)
|
|
print(f"Updated {done_file_path} with {len(processed_prefixes)} new processed prefixes")
|
|
|
|
print(f"Completed processing {processed_count} jobs, skipped {skipped_count} jobs (duration < 60s or no valid data)")
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Generate roofline plots from folder structure')
|
|
parser.add_argument('--base-path', default=r'D:\fritz',
|
|
help='Path to job archive folder')
|
|
parser.add_argument('--output-dir', default='roofline_plots',
|
|
help='Output directory for plots')
|
|
parser.add_argument('--done-file', default='done.txt',
|
|
help='File to track processed job prefixes')
|
|
parser.add_argument('--processes', type=int, default=12,
|
|
help='Number of parallel processes to use (default: 12)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
process_all_jobs(args.base_path, args.output_dir, args.done_file, args.processes)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing jobs: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
if __name__ == "__main__":
|
|
main() |