#!/usr/bin/env python3 import os import gzip import json import argparse from pathlib import Path import matplotlib matplotlib.use('Agg') # Use non-interactive backend import matplotlib.pyplot as plt import numpy as np from datetime import datetime from multiprocessing import Pool, cpu_count import functools def load_job_data_from_folder(job_path): """Load job data from folder""" job_path = Path(job_path) # Load data.json.gz data_file_path = job_path / "data.json.gz" if not data_file_path.exists(): raise FileNotFoundError(f"data.json.gz not found in {job_path}") with gzip.open(data_file_path, 'rt') as f: data = json.load(f) # Load meta.json meta_file_path = job_path / "meta.json" if not meta_file_path.exists(): raise FileNotFoundError(f"meta.json not found in {job_path}") with open(meta_file_path, 'r') as f: meta = json.load(f) return data, meta def load_cluster_config_from_folder(base_path, cluster_path="cluster.json"): """Load cluster configuration data from folder""" config_path = Path(base_path) / cluster_path if not config_path.exists(): raise FileNotFoundError(f"cluster.json not found at {config_path}") with open(config_path, 'r') as f: return json.load(f) def find_job_directories(base_path): """Find all job directories in the folder structure""" base_path = Path(base_path) job_dirs = [] # Look for directories matching pattern: {prefix}/{suffix}/{timestamp} for prefix_dir in base_path.iterdir(): if not prefix_dir.is_dir(): continue for suffix_dir in prefix_dir.iterdir(): if not suffix_dir.is_dir(): continue for timestamp_dir in suffix_dir.iterdir(): if not timestamp_dir.is_dir(): continue # Check if this directory contains both meta.json and data.json.gz if (timestamp_dir / "meta.json").exists() and (timestamp_dir / "data.json.gz").exists(): job_dirs.append(timestamp_dir) return job_dirs def load_done_list(done_file_path): """Load list of already processed job prefixes from done.txt""" done_file = Path(done_file_path) if not done_file.exists(): return set() with open(done_file, 'r') as f: return set(line.strip() for line in f if line.strip()) def save_done_list(done_file_path, done_set): """Save list of processed job prefixes to done.txt""" with open(done_file_path, 'w') as f: for prefix in sorted(done_set): f.write(f"{prefix}\n") def navigate_nested_dict(d, path): """Navigate a nested dictionary using a dot-separated path with array indices""" if not path: return d current = d parts = path.split('.') for part in parts: if '[' in part and ']' in part: # Handle explicit array access syntax (e.g., "node[0]") array_name = part.split('[')[0] index = int(part.split('[')[1].split(']')[0]) current = current[array_name][index] elif part.isdigit(): # Handle numeric indices for list access current = current[int(part)] else: # Handle regular dict access current = current[part] return current def get_subcluster_config(cluster_config, subcluster_name): """Get the configuration for a specific subcluster by name""" subclusters = cluster_config.get("subClusters", []) for i, subcluster in enumerate(subclusters): if subcluster.get("name") == subcluster_name: return i return 0 # Default to first subcluster if not found def get_performance_value(cluster_config, subcluster_idx, metric_path): """Extract a performance value with its proper unit prefix""" # Build the full path full_path = f"subClusters.{subcluster_idx}.{metric_path}" # Extract values value_parts = full_path.split('.') # Direct dictionary traversal instead of using navigate_nested_dict current = cluster_config for part in value_parts: if part.isdigit(): current = current[int(part)] else: current = current[part] value = current.get("value", 0) # Get prefix if available prefix = "" if "unit" in current and "prefix" in current["unit"]: prefix = current["unit"]["prefix"] # Apply prefix multiplier prefix_multipliers = { 'K': 1e3, 'M': 1e6, 'G': 1e9, 'T': 1e12, 'P': 1e15, '': 1.0 } multiplier = prefix_multipliers.get(prefix, 1.0) return value * multiplier def create_roofline_plot(data, meta, cluster_config, output_path=None, colorful=True, show_labels=True, size='normal'): """Create roofline plot with fixed axes for CNN training consistency""" # Get number of nodes and subcluster name num_nodes = meta['numNodes'] subcluster_name = meta.get('subCluster', 'main') # Default to 'main' if not specified # Get subcluster index from the cluster configuration subcluster_idx = get_subcluster_config(cluster_config, subcluster_name) # Get performance ceilings from cluster configuration scalar_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateScalar") simd_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateSimd") mem_bw = get_performance_value(cluster_config, subcluster_idx, "memoryBandwidth") print(f"=== Roofline Plot Configuration ===") print(f"Subcluster: {subcluster_name}") print(f"SIMD Peak Performance: {simd_flops/1e9:.2f} GFLOP/s") print(f"Scalar Peak Performance: {scalar_flops/1e9:.2f} GFLOP/s") print(f"Memory Bandwidth: {mem_bw/1e9:.2f} GB/s") print(f"Number of Nodes: {num_nodes}") # Configure plot size based on size parameter if size == 'small': figsize = (3, 3) dpi = 100 fontsize_labels = 10 fontsize_title = 12 fontsize_legend = 8 fontsize_annotations = 8 elif size == 'medium': figsize = (6, 6) dpi = 100 fontsize_labels = 12 fontsize_title = 14 fontsize_legend = 10 fontsize_annotations = 10 else: # normal figsize = (8, 6) dpi = 100 fontsize_labels = 12 fontsize_title = 14 fontsize_legend = 10 fontsize_annotations = 10 # Fixed plot configuration for CNN training consistency fig = plt.figure(figsize=figsize, dpi=dpi, frameon=False) ax = fig.add_subplot(1, 1, 1) # Fixed axis ranges for CNN training consistency x_min, x_max = 0.01, 1000.0 # FLOP/byte y_min, y_max = 1.0, 10000.0 # GFLOP/s ax.set_xlim(x_min, x_max) ax.set_ylim(y_min, y_max) ax.set_xscale('log') ax.set_yscale('log') # Add axis labels based on show_labels parameter if show_labels: ax.set_xlabel('Arithmetic Intensity [FLOP/byte]', fontsize=fontsize_labels) ax.set_ylabel('Performance [GFLOPS]', fontsize=fontsize_labels) else: ax.set_xlabel('') ax.set_ylabel('') # Set consistent tick marks with minor ticks for log scale ax.set_xticks([0.01, 0.1, 1, 10, 100, 1000]) ax.set_yticks([1, 10, 100, 1000, 10000]) # Add minor ticks for log scale ax.set_xticks([0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 2, 3, 4, 5, 6, 7, 8, 9, 20, 30, 40, 50, 60, 70, 80, 90, 200, 300, 400, 500, 600, 700, 800, 900], minor=True) ax.set_yticks([2, 3, 4, 5, 6, 7, 8, 9, 20, 30, 40, 50, 60, 70, 80, 90, 200, 300, 400, 500, 600, 700, 800, 900, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000], minor=True) # Light colored grid ax.grid(True, which='major', alpha=0.4, linestyle='-', linewidth=0.8, color='gray') ax.grid(True, which='minor', alpha=0.2, linestyle='-', linewidth=0.5, color='lightgray') # Set title and legend font sizes fontsize_title = 14 if size == 'normal' else 12 # Calculate knee points for rooflines # Convert GB/s to Bytes/s for calculation mem_bw_bytes = mem_bw # Already in bytes/s from get_performance_value scalar_knee_intensity = scalar_flops / mem_bw_bytes # FLOP/byte simd_knee_intensity = simd_flops / mem_bw_bytes # FLOP/byte # Convert to GFLOP/s for plotting scalar_peak_gflops = scalar_flops / 1e9 simd_peak_gflops = simd_flops / 1e9 print(f"Scalar knee intensity: {scalar_knee_intensity:.4f} FLOP/byte") print(f"SIMD knee intensity: {simd_knee_intensity:.4f} FLOP/byte") # Create intensity range for roofline plotting intensity_range = np.logspace(np.log10(x_min), np.log10(x_max), 1000) # Define colors based on colorful flag if colorful: memory_color = 'black' scalar_color = 'red' simd_color = 'blue' data_color = 'orange' else: memory_color = 'black' scalar_color = 'darkgray' simd_color = 'gray' data_color = 'black' # Memory bandwidth roof (diagonal line) memory_roof = intensity_range * (mem_bw_bytes / 1e9) # mem_bw_bytes is in bytes/s, convert to GB/s for calculation memory_roof = np.clip(memory_roof, y_min, y_max) # Plot memory bandwidth roof up to the vector knee point memory_mask = intensity_range <= min(simd_knee_intensity, x_max) ax.plot(intensity_range[memory_mask], memory_roof[memory_mask], color=memory_color, linewidth=2, linestyle='-', label=f'Memory BW Roof ({mem_bw_bytes/1e9:.1f} GB/s)') # Scalar compute roof (horizontal line) if scalar_knee_intensity <= x_max and scalar_peak_gflops <= y_max: scalar_x_range = intensity_range[intensity_range >= scalar_knee_intensity] scalar_y_values = np.full_like(scalar_x_range, scalar_peak_gflops) ax.plot(scalar_x_range, scalar_y_values, color=scalar_color, linewidth=2, linestyle='-', label=f'Scalar Roof ({scalar_peak_gflops:.1f} GFLOPS)') # SIMD compute roof (horizontal line) if simd_knee_intensity <= x_max and simd_peak_gflops <= y_max: simd_x_range = intensity_range[intensity_range >= simd_knee_intensity] simd_y_values = np.full_like(simd_x_range, simd_peak_gflops) ax.plot(simd_x_range, simd_y_values, color=simd_color, linewidth=2, linestyle='-', label=f'SIMD Roof ({simd_peak_gflops:.1f} GFLOPS)') print(f"\n=== Data Points ===") print(f"{'Node':4} {'Timestep':8} {'Bandwidth (raw)':15} {'FLOPS (raw)':15} {'AI (FLOP/byte)':15}") print("-" * 75) # Process job data points has_valid_data = False all_intensities = [] all_performances = [] all_timestamps = [] # Track timestamps for coloring all_nodes = [] # Track node numbers for coloring # Statistics for data filtering total_data_points = 0 filtered_invalid = 0 filtered_above_roof = 0 filtered_out_of_bounds = 0 for node_num in range(num_nodes): try: # Extract data series mem_bw_series = data.get("mem_bw", {}).get("node", {}).get("series", []) flops_series = data.get("flops_any", {}).get("node", {}).get("series", []) if node_num >= len(mem_bw_series) or node_num >= len(flops_series): continue mem_bw_data = mem_bw_series[node_num].get("data", []) flops_data = flops_series[node_num].get("data", []) # Ensure we have the same number of time steps for both metrics num_timesteps = min(len(mem_bw_data), len(flops_data)) if num_timesteps == 0: continue total_data_points += num_timesteps # Process each time step for t_idx in range(num_timesteps): # Get data values (assuming raw data is already in appropriate units) bw_raw = mem_bw_data[t_idx] # Memory bandwidth (raw value) flops_raw = flops_data[t_idx] # Performance in FLOP/s (raw value) # Skip if we have invalid data (None, NaN, or non-positive values) if (bw_raw is None or flops_raw is None or (isinstance(bw_raw, (int, float)) and (np.isnan(bw_raw) or np.isinf(bw_raw))) or (isinstance(flops_raw, (int, float)) and (np.isnan(flops_raw) or np.isinf(flops_raw))) or bw_raw <= 0 or flops_raw <= 0): filtered_invalid += 1 continue # For arithmetic intensity calculation, we need consistent units # Check if we need to adjust the FLOPS scaling # If bandwidth is ~200 GB/s and we want AI around 5 FLOP/byte, # then FLOPS should be around 1000 * 1e9 = 1e12 FLOP/s # Try different FLOPS scalings to see what makes sense flops_gflops = flops_raw # Assume raw is already in GFLOP/s bw_gb_per_sec = bw_raw # Assume raw is in GB/s # Calculate AI in FLOP/byte: (GFLOP/s) / (GB/s) = FLOP/byte arith_intensity = flops_gflops / bw_gb_per_sec # Additional validation: check for reasonable ranges # Skip if arithmetic intensity is too extreme (likely measurement error) if arith_intensity < 1e-6 or arith_intensity > 1e6: filtered_invalid += 1 continue # Use raw FLOPS value for plotting (assume it's already in GFLOP/s) performance_for_plot = flops_raw # Check if performance exceeds the theoretical maximum (above roofline) # Use the higher of SIMD or scalar peak performance as the ceiling max_theoretical_performance = max(simd_peak_gflops, scalar_peak_gflops) # Also check against memory bandwidth ceiling at this intensity mem_bw_ceiling = arith_intensity * (mem_bw_bytes / 1e9) # Convert to GFLOP/s effective_ceiling = min(max_theoretical_performance, mem_bw_ceiling) # Skip unrealistic data points that are significantly above the roofline # Allow for some measurement tolerance (e.g., 5% above theoretical max) if performance_for_plot > effective_ceiling * 1.05: filtered_above_roof += 1 continue # Print key data points for monitoring if t_idx % 10 == 0: # Print every 10th point to avoid spam print(f"{node_num:4d} {t_idx:8d} {bw_raw:15.2f} {performance_for_plot:15.2f} {arith_intensity:15.4f}") # Check if points are within our fixed bounds if (x_min <= arith_intensity <= x_max and y_min <= performance_for_plot <= y_max): all_intensities.append(arith_intensity) all_performances.append(performance_for_plot) all_timestamps.append(t_idx) # Store timestamp for coloring all_nodes.append(node_num) # Store node number for coloring has_valid_data = True else: filtered_out_of_bounds += 1 except (KeyError, IndexError, ValueError) as e: print(f"Warning: Error processing data for node {node_num}: {e}") continue # Plot all data points with timestamp-based coloring if has_valid_data: if colorful: # Define color palette for different nodes node_colors = ['red', 'blue', 'green', 'orange', 'purple', 'brown', 'pink', 'gray', 'olive', 'cyan'] # Group data by node for plotting node_data = {} for i, (intensity, performance, timestamp, node_num) in enumerate(zip(all_intensities, all_performances, all_timestamps, all_nodes)): if node_num not in node_data: node_data[node_num] = {'intensities': [], 'performances': [], 'timestamps': []} node_data[node_num]['intensities'].append(intensity) node_data[node_num]['performances'].append(performance) node_data[node_num]['timestamps'].append(timestamp) # Plot each node with its own color, but with timestamp-based alpha for node_num in sorted(node_data.keys()): node_intensities = node_data[node_num]['intensities'] node_performances = node_data[node_num]['performances'] node_timestamps = node_data[node_num]['timestamps'] # Get base color for this node base_color = node_colors[node_num % len(node_colors)] # Calculate alpha values based on timestamp progression within this node if len(node_timestamps) > 1: min_time = min(node_timestamps) max_time = max(node_timestamps) alphas = [] for t in node_timestamps: # Normalize timestamp to 0-1 range normalized_time = (t - min_time) / (max_time - min_time) if max_time > min_time else 0 # Map to alpha: 0.3 (early) to 0.9 (late) alpha = 0.3 + 0.6 * normalized_time alphas.append(alpha) else: # Single timestamp or no variation alphas = [0.7] * len(node_timestamps) # Plot points for this node for intensity, performance, alpha in zip(node_intensities, node_performances, alphas): ax.scatter(intensity, performance, s=20, alpha=alpha, c=base_color, edgecolors='none') else: # Grayscale mode: use timestamp-based grayscale coloring across all nodes if len(all_timestamps) > 1: min_time = min(all_timestamps) max_time = max(all_timestamps) colors = [] alphas = [] for t in all_timestamps: # Normalize timestamp to 0-1 range normalized_time = (t - min_time) / (max_time - min_time) if max_time > min_time else 0 # Map to grayscale: 0.0 (black) to 0.7 (light gray) gray_value = 0.7 * normalized_time colors.append(str(gray_value)) # Map to alpha: 0.3 (early) to 0.9 (late) alpha = 0.3 + 0.6 * normalized_time alphas.append(alpha) else: # Single timestamp or no variation colors = ['black'] * len(all_timestamps) alphas = [0.7] * len(all_timestamps) # Plot all points with timestamp-based coloring and alpha for intensity, performance, color, alpha in zip(all_intensities, all_performances, colors, alphas): ax.scatter(intensity, performance, s=20, alpha=alpha, c=color, edgecolors='none') # Print filtering statistics valid_points = len(all_intensities) print(f"\n=== Data Filtering Statistics ===") print(f"Total data points processed: {total_data_points}") print(f"Filtered out - Invalid/None/NaN: {filtered_invalid}") print(f"Filtered out - Above roofline: {filtered_above_roof}") print(f"Filtered out - Out of bounds: {filtered_out_of_bounds}") print(f"Valid data points plotted: {valid_points}") print(f"Data retention rate: {(valid_points/total_data_points)*100:.1f}%" if total_data_points > 0 else "No data points") print(f"\nPlotted {len(all_intensities)} valid data points with {'node-based coloring' if colorful else 'timestamp-based coloring'}") else: print(f"\n=== Data Filtering Statistics ===") print(f"Total data points processed: {total_data_points}") print(f"Filtered out - Invalid/None/NaN: {filtered_invalid}") print(f"Filtered out - Above roofline: {filtered_above_roof}") print(f"Filtered out - Out of bounds: {filtered_out_of_bounds}") print(f"Valid data points plotted: 0") print("\nNo valid data points found within the plot bounds - skipping plot generation") plt.close(fig) return False # Return False to indicate no plot was generated # Add title and legend if show_labels : job_id = meta.get('jobId', 'unknown') ax.set_title(f'Roofline Plot - Job {job_id} ({subcluster_name})', fontsize=fontsize_title) ax.legend(loc='lower right', fontsize=fontsize_legend) # Save or display if output_path: plt.tight_layout() # Adjust DPI based on size for specific pixel dimensions if size == 'small': save_dpi = 100 # 300x300 pixels at 3x3 inches elif size == 'medium': save_dpi = 100 # 600x600 pixels at 6x6 inches else: save_dpi = 300 # High quality for normal size plt.savefig(output_path, format='png', dpi=save_dpi, bbox_inches='tight', facecolor='white') plt.close(fig) print(f"\nSaved roofline plot to: {output_path}") return True # Return True to indicate successful plot generation else: plt.tight_layout() plt.show() print("\nDisplayed roofline plot") return True # Return True to indicate successful plot generation def process_single_job(args): """Process a single job - designed for multiprocessing""" job_path, cluster_config, output_base_dir = args try: # Parse job directory structure: {prefix}/{suffix}/{timestamp} parts = job_path.parts if len(parts) < 3: return False, f"Invalid job path structure: {job_path}" job_id_prefix = parts[-3] job_id_suffix = parts[-2] timestamp = parts[-1] job_id = job_id_prefix + job_id_suffix # Load job data data, meta = load_job_data_from_folder(job_path) # Check job duration - skip jobs less than 60 seconds duration = meta.get('duration', 0) if duration < 60: return False, f"Skipped job {job_id}_{timestamp}: duration {duration}s < 60s" # Create output directory for this job prefix output_dir = Path(output_base_dir) / job_id_prefix output_dir.mkdir(parents=True, exist_ok=True) # Generate both colorful and grayscale plots base_filename = f"{job_id}_{timestamp}" # Colorful plot colorful_path = output_dir / f"{base_filename}_color.png" colorful_success = create_roofline_plot(data, meta, cluster_config, output_path=str(colorful_path), colorful=True, show_labels=False, size='medium') # Grayscale plot grayscale_path = output_dir / f"{base_filename}_grayscale.png" grayscale_success = create_roofline_plot(data, meta, cluster_config, output_path=str(grayscale_path), colorful=False, show_labels=False, size='medium') # Check if at least one plot was generated if colorful_success or grayscale_success: return True, f"Processed job {job_id}_{timestamp} (duration: {duration}s)" else: return False, f"Skipped job {job_id}_{timestamp}: no valid data to plot" except Exception as e: return False, f"Error processing job {job_path}: {e}" def process_all_jobs(base_path, output_base_dir, done_file_path="done.txt", num_processes=None): """Process all jobs in the folder structure using multiprocessing""" print(f"Processing jobs from: {base_path}") print(f"Output directory: {output_base_dir}") # Load cluster configuration try: cluster_config = load_cluster_config_from_folder(base_path) print("Loaded cluster configuration") except FileNotFoundError as e: print(f"Error loading cluster config: {e}") return # Load already processed jobs done_set = load_done_list(done_file_path) print(f"Loaded {len(done_set)} already processed job prefixes from {done_file_path}") # Find all job directories job_dirs = find_job_directories(base_path) print(f"Found {len(job_dirs)} total job directories") # Filter out already processed jobs filtered_job_dirs = [] for job_path in job_dirs: parts = job_path.parts if len(parts) >= 3: job_id_prefix = parts[-3] if job_id_prefix not in done_set: filtered_job_dirs.append(job_path) print(f"Found {len(filtered_job_dirs)} unprocessed job directories") if not filtered_job_dirs: print("No new jobs to process") return # Determine number of processes if num_processes is None: num_processes = min(12, cpu_count()) # Use up to 12 cores or available cores print(f"Using {num_processes} processes for parallel processing") # Prepare arguments for multiprocessing process_args = [(job_path, cluster_config, output_base_dir) for job_path in filtered_job_dirs] processed_count = 0 skipped_count = 0 processed_prefixes = set() # Process jobs in parallel if num_processes == 1: # Single-threaded processing for debugging results = [process_single_job(args) for args in process_args] else: # Multi-threaded processing with Pool(num_processes) as pool: results = pool.map(process_single_job, process_args) # Collect results for i, (success, message) in enumerate(results): print(message) if success: processed_count += 1 # Extract job prefix for done tracking job_path = filtered_job_dirs[i] parts = job_path.parts if len(parts) >= 3: job_id_prefix = parts[-3] processed_prefixes.add(job_id_prefix) else: skipped_count += 1 # Update done list with newly processed prefixes if processed_prefixes: updated_done_set = done_set | processed_prefixes save_done_list(done_file_path, updated_done_set) print(f"Updated {done_file_path} with {len(processed_prefixes)} new processed prefixes") print(f"Completed processing {processed_count} jobs, skipped {skipped_count} jobs (duration < 60s or no valid data)") def main(): parser = argparse.ArgumentParser(description='Generate roofline plots from folder structure') parser.add_argument('--base-path', default=r'D:\fritz', help='Path to job archive folder') parser.add_argument('--output-dir', default='roofline_plots', help='Output directory for plots') parser.add_argument('--done-file', default='done.txt', help='File to track processed job prefixes') parser.add_argument('--processes', type=int, default=12, help='Number of parallel processes to use (default: 12)') args = parser.parse_args() try: process_all_jobs(args.base_path, args.output_dir, args.done_file, args.processes) except Exception as e: print(f"Error processing jobs: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()