Files
slurm-application-detection/roofline_plotter.py
2025-10-14 15:07:12 +02:00

667 lines
28 KiB
Python

#!/usr/bin/env python3
import os
import gzip
import json
import argparse
from pathlib import Path
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
from multiprocessing import Pool, cpu_count
import functools
def load_job_data_from_folder(job_path):
"""Load job data from folder"""
job_path = Path(job_path)
# Load data.json.gz
data_file_path = job_path / "data.json.gz"
if not data_file_path.exists():
raise FileNotFoundError(f"data.json.gz not found in {job_path}")
with gzip.open(data_file_path, 'rt') as f:
data = json.load(f)
# Load meta.json
meta_file_path = job_path / "meta.json"
if not meta_file_path.exists():
raise FileNotFoundError(f"meta.json not found in {job_path}")
with open(meta_file_path, 'r') as f:
meta = json.load(f)
return data, meta
def load_cluster_config_from_folder(base_path, cluster_path="cluster.json"):
"""Load cluster configuration data from folder"""
config_path = Path(base_path) / cluster_path
if not config_path.exists():
raise FileNotFoundError(f"cluster.json not found at {config_path}")
with open(config_path, 'r') as f:
return json.load(f)
def find_job_directories(base_path):
"""Find all job directories in the folder structure"""
base_path = Path(base_path)
job_dirs = []
# Look for directories matching pattern: {prefix}/{suffix}/{timestamp}
for prefix_dir in base_path.iterdir():
if not prefix_dir.is_dir():
continue
for suffix_dir in prefix_dir.iterdir():
if not suffix_dir.is_dir():
continue
for timestamp_dir in suffix_dir.iterdir():
if not timestamp_dir.is_dir():
continue
# Check if this directory contains both meta.json and data.json.gz
if (timestamp_dir / "meta.json").exists() and (timestamp_dir / "data.json.gz").exists():
job_dirs.append(timestamp_dir)
return job_dirs
def load_done_list(done_file_path):
"""Load list of already processed job prefixes from done.txt"""
done_file = Path(done_file_path)
if not done_file.exists():
return set()
with open(done_file, 'r') as f:
return set(line.strip() for line in f if line.strip())
def save_done_list(done_file_path, done_set):
"""Save list of processed job prefixes to done.txt"""
with open(done_file_path, 'w') as f:
for prefix in sorted(done_set):
f.write(f"{prefix}\n")
def navigate_nested_dict(d, path):
"""Navigate a nested dictionary using a dot-separated path with array indices"""
if not path:
return d
current = d
parts = path.split('.')
for part in parts:
if '[' in part and ']' in part:
# Handle explicit array access syntax (e.g., "node[0]")
array_name = part.split('[')[0]
index = int(part.split('[')[1].split(']')[0])
current = current[array_name][index]
elif part.isdigit():
# Handle numeric indices for list access
current = current[int(part)]
else:
# Handle regular dict access
current = current[part]
return current
def get_subcluster_config(cluster_config, subcluster_name):
"""Get the configuration for a specific subcluster by name"""
subclusters = cluster_config.get("subClusters", [])
for i, subcluster in enumerate(subclusters):
if subcluster.get("name") == subcluster_name:
return i
return 0 # Default to first subcluster if not found
def get_performance_value(cluster_config, subcluster_idx, metric_path):
"""Extract a performance value with its proper unit prefix"""
# Build the full path
full_path = f"subClusters.{subcluster_idx}.{metric_path}"
# Extract values
value_parts = full_path.split('.')
# Direct dictionary traversal instead of using navigate_nested_dict
current = cluster_config
for part in value_parts:
if part.isdigit():
current = current[int(part)]
else:
current = current[part]
value = current.get("value", 0)
# Get prefix if available
prefix = ""
if "unit" in current and "prefix" in current["unit"]:
prefix = current["unit"]["prefix"]
# Apply prefix multiplier
prefix_multipliers = {
'K': 1e3,
'M': 1e6,
'G': 1e9,
'T': 1e12,
'P': 1e15,
'': 1.0
}
multiplier = prefix_multipliers.get(prefix, 1.0)
return value * multiplier
def create_roofline_plot(data, meta, cluster_config, output_path=None, colorful=True, show_labels=True, size='normal'):
"""Create roofline plot with fixed axes for CNN training consistency"""
# Get number of nodes and subcluster name
num_nodes = meta['numNodes']
subcluster_name = meta.get('subCluster', 'main') # Default to 'main' if not specified
# Get subcluster index from the cluster configuration
subcluster_idx = get_subcluster_config(cluster_config, subcluster_name)
# Get performance ceilings from cluster configuration
scalar_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateScalar")
simd_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateSimd")
mem_bw = get_performance_value(cluster_config, subcluster_idx, "memoryBandwidth")
print(f"=== Roofline Plot Configuration ===")
print(f"Subcluster: {subcluster_name}")
print(f"SIMD Peak Performance: {simd_flops/1e9:.2f} GFLOP/s")
print(f"Scalar Peak Performance: {scalar_flops/1e9:.2f} GFLOP/s")
print(f"Memory Bandwidth: {mem_bw/1e9:.2f} GB/s")
print(f"Number of Nodes: {num_nodes}")
# Configure plot size based on size parameter
if size == 'small':
figsize = (3, 3)
dpi = 100
fontsize_labels = 10
fontsize_title = 12
fontsize_legend = 8
fontsize_annotations = 8
elif size == 'medium':
figsize = (6, 6)
dpi = 100
fontsize_labels = 12
fontsize_title = 14
fontsize_legend = 10
fontsize_annotations = 10
else: # normal
figsize = (8, 6)
dpi = 100
fontsize_labels = 12
fontsize_title = 14
fontsize_legend = 10
fontsize_annotations = 10
# Fixed plot configuration for CNN training consistency
fig = plt.figure(figsize=figsize, dpi=dpi, frameon=False)
ax = fig.add_subplot(1, 1, 1)
# Fixed axis ranges for CNN training consistency
x_min, x_max = 0.01, 1000.0 # FLOP/byte
y_min, y_max = 1.0, 10000.0 # GFLOP/s
ax.set_xlim(x_min, x_max)
ax.set_ylim(y_min, y_max)
ax.set_xscale('log')
ax.set_yscale('log')
# Add axis labels based on show_labels parameter
if show_labels:
ax.set_xlabel('Arithmetic Intensity [FLOP/byte]', fontsize=fontsize_labels)
ax.set_ylabel('Performance [GFLOPS]', fontsize=fontsize_labels)
else:
ax.set_xlabel('')
ax.set_ylabel('')
# Set consistent tick marks with minor ticks for log scale
ax.set_xticks([0.01, 0.1, 1, 10, 100, 1000])
ax.set_yticks([1, 10, 100, 1000, 10000])
# Add minor ticks for log scale
ax.set_xticks([0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09,
0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9,
2, 3, 4, 5, 6, 7, 8, 9,
20, 30, 40, 50, 60, 70, 80, 90,
200, 300, 400, 500, 600, 700, 800, 900], minor=True)
ax.set_yticks([2, 3, 4, 5, 6, 7, 8, 9,
20, 30, 40, 50, 60, 70, 80, 90,
200, 300, 400, 500, 600, 700, 800, 900,
2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000], minor=True)
# Light colored grid
ax.grid(True, which='major', alpha=0.4, linestyle='-', linewidth=0.8, color='gray')
ax.grid(True, which='minor', alpha=0.2, linestyle='-', linewidth=0.5, color='lightgray')
# Set title and legend font sizes
fontsize_title = 14 if size == 'normal' else 12
# Calculate knee points for rooflines
# Convert GB/s to Bytes/s for calculation
mem_bw_bytes = mem_bw # Already in bytes/s from get_performance_value
scalar_knee_intensity = scalar_flops / mem_bw_bytes # FLOP/byte
simd_knee_intensity = simd_flops / mem_bw_bytes # FLOP/byte
# Convert to GFLOP/s for plotting
scalar_peak_gflops = scalar_flops / 1e9
simd_peak_gflops = simd_flops / 1e9
print(f"Scalar knee intensity: {scalar_knee_intensity:.4f} FLOP/byte")
print(f"SIMD knee intensity: {simd_knee_intensity:.4f} FLOP/byte")
# Create intensity range for roofline plotting
intensity_range = np.logspace(np.log10(x_min), np.log10(x_max), 1000)
# Define colors based on colorful flag
if colorful:
memory_color = 'black'
scalar_color = 'red'
simd_color = 'blue'
data_color = 'orange'
else:
memory_color = 'black'
scalar_color = 'darkgray'
simd_color = 'gray'
data_color = 'black'
# Memory bandwidth roof (diagonal line)
memory_roof = intensity_range * (mem_bw_bytes / 1e9) # mem_bw_bytes is in bytes/s, convert to GB/s for calculation
memory_roof = np.clip(memory_roof, y_min, y_max)
# Plot memory bandwidth roof up to the vector knee point
memory_mask = intensity_range <= min(simd_knee_intensity, x_max)
ax.plot(intensity_range[memory_mask], memory_roof[memory_mask],
color=memory_color, linewidth=2, linestyle='-',
label=f'Memory BW Roof ({mem_bw_bytes/1e9:.1f} GB/s)')
# Scalar compute roof (horizontal line)
if scalar_knee_intensity <= x_max and scalar_peak_gflops <= y_max:
scalar_x_range = intensity_range[intensity_range >= scalar_knee_intensity]
scalar_y_values = np.full_like(scalar_x_range, scalar_peak_gflops)
ax.plot(scalar_x_range, scalar_y_values,
color=scalar_color, linewidth=2, linestyle='-',
label=f'Scalar Roof ({scalar_peak_gflops:.1f} GFLOPS)')
# SIMD compute roof (horizontal line)
if simd_knee_intensity <= x_max and simd_peak_gflops <= y_max:
simd_x_range = intensity_range[intensity_range >= simd_knee_intensity]
simd_y_values = np.full_like(simd_x_range, simd_peak_gflops)
ax.plot(simd_x_range, simd_y_values,
color=simd_color, linewidth=2, linestyle='-',
label=f'SIMD Roof ({simd_peak_gflops:.1f} GFLOPS)')
print(f"\n=== Data Points ===")
print(f"{'Node':4} {'Timestep':8} {'Bandwidth (raw)':15} {'FLOPS (raw)':15} {'AI (FLOP/byte)':15}")
print("-" * 75)
# Process job data points
has_valid_data = False
all_intensities = []
all_performances = []
all_timestamps = [] # Track timestamps for coloring
all_nodes = [] # Track node numbers for coloring
# Statistics for data filtering
total_data_points = 0
filtered_invalid = 0
filtered_above_roof = 0
filtered_out_of_bounds = 0
for node_num in range(num_nodes):
try:
# Extract data series
mem_bw_series = data.get("mem_bw", {}).get("node", {}).get("series", [])
flops_series = data.get("flops_any", {}).get("node", {}).get("series", [])
if node_num >= len(mem_bw_series) or node_num >= len(flops_series):
continue
mem_bw_data = mem_bw_series[node_num].get("data", [])
flops_data = flops_series[node_num].get("data", [])
# Ensure we have the same number of time steps for both metrics
num_timesteps = min(len(mem_bw_data), len(flops_data))
if num_timesteps == 0:
continue
total_data_points += num_timesteps
# Process each time step
for t_idx in range(num_timesteps):
# Get data values (assuming raw data is already in appropriate units)
bw_raw = mem_bw_data[t_idx] # Memory bandwidth (raw value)
flops_raw = flops_data[t_idx] # Performance in FLOP/s (raw value)
# Skip if we have invalid data (None, NaN, or non-positive values)
if (bw_raw is None or flops_raw is None or
(isinstance(bw_raw, (int, float)) and (np.isnan(bw_raw) or np.isinf(bw_raw))) or
(isinstance(flops_raw, (int, float)) and (np.isnan(flops_raw) or np.isinf(flops_raw))) or
bw_raw <= 0 or flops_raw <= 0):
filtered_invalid += 1
continue
# For arithmetic intensity calculation, we need consistent units
# Check if we need to adjust the FLOPS scaling
# If bandwidth is ~200 GB/s and we want AI around 5 FLOP/byte,
# then FLOPS should be around 1000 * 1e9 = 1e12 FLOP/s
# Try different FLOPS scalings to see what makes sense
flops_gflops = flops_raw # Assume raw is already in GFLOP/s
bw_gb_per_sec = bw_raw # Assume raw is in GB/s
# Calculate AI in FLOP/byte: (GFLOP/s) / (GB/s) = FLOP/byte
arith_intensity = flops_gflops / bw_gb_per_sec
# Additional validation: check for reasonable ranges
# Skip if arithmetic intensity is too extreme (likely measurement error)
if arith_intensity < 1e-6 or arith_intensity > 1e6:
filtered_invalid += 1
continue
# Use raw FLOPS value for plotting (assume it's already in GFLOP/s)
performance_for_plot = flops_raw
# Check if performance exceeds the theoretical maximum (above roofline)
# Use the higher of SIMD or scalar peak performance as the ceiling
max_theoretical_performance = max(simd_peak_gflops, scalar_peak_gflops)
# Also check against memory bandwidth ceiling at this intensity
mem_bw_ceiling = arith_intensity * (mem_bw_bytes / 1e9) # Convert to GFLOP/s
effective_ceiling = min(max_theoretical_performance, mem_bw_ceiling)
# Skip unrealistic data points that are significantly above the roofline
# Allow for some measurement tolerance (e.g., 5% above theoretical max)
if performance_for_plot > effective_ceiling * 1.05:
filtered_above_roof += 1
continue
# Print key data points for monitoring
if t_idx % 10 == 0: # Print every 10th point to avoid spam
print(f"{node_num:4d} {t_idx:8d} {bw_raw:15.2f} {performance_for_plot:15.2f} {arith_intensity:15.4f}")
# Check if points are within our fixed bounds
if (x_min <= arith_intensity <= x_max and
y_min <= performance_for_plot <= y_max):
all_intensities.append(arith_intensity)
all_performances.append(performance_for_plot)
all_timestamps.append(t_idx) # Store timestamp for coloring
all_nodes.append(node_num) # Store node number for coloring
has_valid_data = True
else:
filtered_out_of_bounds += 1
except (KeyError, IndexError, ValueError) as e:
print(f"Warning: Error processing data for node {node_num}: {e}")
continue
# Plot all data points with timestamp-based coloring
if has_valid_data:
if colorful:
# Define color palette for different nodes
node_colors = ['red', 'blue', 'green', 'orange', 'purple', 'brown', 'pink', 'gray', 'olive', 'cyan']
# Group data by node for plotting
node_data = {}
for i, (intensity, performance, timestamp, node_num) in enumerate(zip(all_intensities, all_performances, all_timestamps, all_nodes)):
if node_num not in node_data:
node_data[node_num] = {'intensities': [], 'performances': [], 'timestamps': []}
node_data[node_num]['intensities'].append(intensity)
node_data[node_num]['performances'].append(performance)
node_data[node_num]['timestamps'].append(timestamp)
# Plot each node with its own color, but with timestamp-based alpha
for node_num in sorted(node_data.keys()):
node_intensities = node_data[node_num]['intensities']
node_performances = node_data[node_num]['performances']
node_timestamps = node_data[node_num]['timestamps']
# Get base color for this node
base_color = node_colors[node_num % len(node_colors)]
# Calculate alpha values based on timestamp progression within this node
if len(node_timestamps) > 1:
min_time = min(node_timestamps)
max_time = max(node_timestamps)
alphas = []
for t in node_timestamps:
# Normalize timestamp to 0-1 range
normalized_time = (t - min_time) / (max_time - min_time) if max_time > min_time else 0
# Map to alpha: 0.3 (early) to 0.9 (late)
alpha = 0.3 + 0.6 * normalized_time
alphas.append(alpha)
else:
# Single timestamp or no variation
alphas = [0.7] * len(node_timestamps)
# Plot points for this node
for intensity, performance, alpha in zip(node_intensities, node_performances, alphas):
ax.scatter(intensity, performance, s=20, alpha=alpha, c=base_color, edgecolors='none')
else:
# Grayscale mode: use timestamp-based grayscale coloring across all nodes
if len(all_timestamps) > 1:
min_time = min(all_timestamps)
max_time = max(all_timestamps)
colors = []
alphas = []
for t in all_timestamps:
# Normalize timestamp to 0-1 range
normalized_time = (t - min_time) / (max_time - min_time) if max_time > min_time else 0
# Map to grayscale: 0.0 (black) to 0.7 (light gray)
gray_value = 0.7 * normalized_time
colors.append(str(gray_value))
# Map to alpha: 0.3 (early) to 0.9 (late)
alpha = 0.3 + 0.6 * normalized_time
alphas.append(alpha)
else:
# Single timestamp or no variation
colors = ['black'] * len(all_timestamps)
alphas = [0.7] * len(all_timestamps)
# Plot all points with timestamp-based coloring and alpha
for intensity, performance, color, alpha in zip(all_intensities, all_performances, colors, alphas):
ax.scatter(intensity, performance, s=20, alpha=alpha, c=color, edgecolors='none')
# Print filtering statistics
valid_points = len(all_intensities)
print(f"\n=== Data Filtering Statistics ===")
print(f"Total data points processed: {total_data_points}")
print(f"Filtered out - Invalid/None/NaN: {filtered_invalid}")
print(f"Filtered out - Above roofline: {filtered_above_roof}")
print(f"Filtered out - Out of bounds: {filtered_out_of_bounds}")
print(f"Valid data points plotted: {valid_points}")
print(f"Data retention rate: {(valid_points/total_data_points)*100:.1f}%" if total_data_points > 0 else "No data points")
print(f"\nPlotted {len(all_intensities)} valid data points with {'node-based coloring' if colorful else 'timestamp-based coloring'}")
else:
print(f"\n=== Data Filtering Statistics ===")
print(f"Total data points processed: {total_data_points}")
print(f"Filtered out - Invalid/None/NaN: {filtered_invalid}")
print(f"Filtered out - Above roofline: {filtered_above_roof}")
print(f"Filtered out - Out of bounds: {filtered_out_of_bounds}")
print(f"Valid data points plotted: 0")
print("\nNo valid data points found within the plot bounds - skipping plot generation")
plt.close(fig)
return False # Return False to indicate no plot was generated
# Add title and legend
if show_labels :
job_id = meta.get('jobId', 'unknown')
ax.set_title(f'Roofline Plot - Job {job_id} ({subcluster_name})', fontsize=fontsize_title)
ax.legend(loc='lower right', fontsize=fontsize_legend)
# Save or display
if output_path:
plt.tight_layout()
# Adjust DPI based on size for specific pixel dimensions
if size == 'small':
save_dpi = 100 # 300x300 pixels at 3x3 inches
elif size == 'medium':
save_dpi = 100 # 600x600 pixels at 6x6 inches
else:
save_dpi = 300 # High quality for normal size
plt.savefig(output_path, format='png', dpi=save_dpi, bbox_inches='tight', facecolor='white')
plt.close(fig)
print(f"\nSaved roofline plot to: {output_path}")
return True # Return True to indicate successful plot generation
else:
plt.tight_layout()
plt.show()
print("\nDisplayed roofline plot")
return True # Return True to indicate successful plot generation
def process_single_job(args):
"""Process a single job - designed for multiprocessing"""
job_path, cluster_config, output_base_dir = args
try:
# Parse job directory structure: {prefix}/{suffix}/{timestamp}
parts = job_path.parts
if len(parts) < 3:
return False, f"Invalid job path structure: {job_path}"
job_id_prefix = parts[-3]
job_id_suffix = parts[-2]
timestamp = parts[-1]
job_id = job_id_prefix + job_id_suffix
# Load job data
data, meta = load_job_data_from_folder(job_path)
# Check job duration - skip jobs less than 60 seconds
duration = meta.get('duration', 0)
if duration < 60:
return False, f"Skipped job {job_id}_{timestamp}: duration {duration}s < 60s"
# Create output directory for this job prefix
output_dir = Path(output_base_dir) / job_id_prefix
output_dir.mkdir(parents=True, exist_ok=True)
# Generate both colorful and grayscale plots
base_filename = f"{job_id}_{timestamp}"
# Colorful plot
colorful_path = output_dir / f"{base_filename}_color.png"
colorful_success = create_roofline_plot(data, meta, cluster_config,
output_path=str(colorful_path),
colorful=True, show_labels=False, size='medium')
# Grayscale plot
grayscale_path = output_dir / f"{base_filename}_grayscale.png"
grayscale_success = create_roofline_plot(data, meta, cluster_config,
output_path=str(grayscale_path),
colorful=False, show_labels=False, size='medium')
# Check if at least one plot was generated
if colorful_success or grayscale_success:
return True, f"Processed job {job_id}_{timestamp} (duration: {duration}s)"
else:
return False, f"Skipped job {job_id}_{timestamp}: no valid data to plot"
except Exception as e:
return False, f"Error processing job {job_path}: {e}"
def process_all_jobs(base_path, output_base_dir, done_file_path="done.txt", num_processes=None):
"""Process all jobs in the folder structure using multiprocessing"""
print(f"Processing jobs from: {base_path}")
print(f"Output directory: {output_base_dir}")
# Load cluster configuration
try:
cluster_config = load_cluster_config_from_folder(base_path)
print("Loaded cluster configuration")
except FileNotFoundError as e:
print(f"Error loading cluster config: {e}")
return
# Load already processed jobs
done_set = load_done_list(done_file_path)
print(f"Loaded {len(done_set)} already processed job prefixes from {done_file_path}")
# Find all job directories
job_dirs = find_job_directories(base_path)
print(f"Found {len(job_dirs)} total job directories")
# Filter out already processed jobs
filtered_job_dirs = []
for job_path in job_dirs:
parts = job_path.parts
if len(parts) >= 3:
job_id_prefix = parts[-3]
if job_id_prefix not in done_set:
filtered_job_dirs.append(job_path)
print(f"Found {len(filtered_job_dirs)} unprocessed job directories")
if not filtered_job_dirs:
print("No new jobs to process")
return
# Determine number of processes
if num_processes is None:
num_processes = min(12, cpu_count()) # Use up to 12 cores or available cores
print(f"Using {num_processes} processes for parallel processing")
# Prepare arguments for multiprocessing
process_args = [(job_path, cluster_config, output_base_dir) for job_path in filtered_job_dirs]
processed_count = 0
skipped_count = 0
processed_prefixes = set()
# Process jobs in parallel
if num_processes == 1:
# Single-threaded processing for debugging
results = [process_single_job(args) for args in process_args]
else:
# Multi-threaded processing
with Pool(num_processes) as pool:
results = pool.map(process_single_job, process_args)
# Collect results
for i, (success, message) in enumerate(results):
print(message)
if success:
processed_count += 1
# Extract job prefix for done tracking
job_path = filtered_job_dirs[i]
parts = job_path.parts
if len(parts) >= 3:
job_id_prefix = parts[-3]
processed_prefixes.add(job_id_prefix)
else:
skipped_count += 1
# Update done list with newly processed prefixes
if processed_prefixes:
updated_done_set = done_set | processed_prefixes
save_done_list(done_file_path, updated_done_set)
print(f"Updated {done_file_path} with {len(processed_prefixes)} new processed prefixes")
print(f"Completed processing {processed_count} jobs, skipped {skipped_count} jobs (duration < 60s or no valid data)")
def main():
parser = argparse.ArgumentParser(description='Generate roofline plots from folder structure')
parser.add_argument('--base-path', default=r'D:\fritz',
help='Path to job archive folder')
parser.add_argument('--output-dir', default='roofline_plots',
help='Output directory for plots')
parser.add_argument('--done-file', default='done.txt',
help='File to track processed job prefixes')
parser.add_argument('--processes', type=int, default=12,
help='Number of parallel processes to use (default: 12)')
args = parser.parse_args()
try:
process_all_jobs(args.base_path, args.output_dir, args.done_file, args.processes)
except Exception as e:
print(f"Error processing jobs: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()