#!/usr/bin/env python3 import os import gzip import json import argparse from pathlib import Path import pandas as pd import numpy as np from datetime import datetime from multiprocessing import Pool, cpu_count import functools def create_ml_example(): """ Example function showing how to use the generated dataframes for ML training """ print("=== Machine Learning Example with Roofline Data ===") print("This is an example of how to use the generated dataframes for training ML models.") print("The dataframes contain features suitable for various ML tasks:") print("- Regression: Predict performance (performance_gflops)") print("- Classification: Predict roofline region (roofline_region)") print("- Clustering: Group similar performance patterns") print() # Example feature columns available in the dataframes feature_columns = [ 'arith_intensity', 'bandwidth_raw', 'flops_raw', 'intensity_to_scalar_knee_ratio', 'intensity_to_simd_knee_ratio', 'performance_to_scalar_peak_ratio', 'performance_to_simd_peak_ratio', 'bandwidth_to_memory_bw_ratio', 'efficiency', 'is_memory_bound', 'is_scalar_compute_bound', 'is_simd_compute_bound', 'is_compute_bound' ] target_columns = [ 'performance_gflops', # For regression 'roofline_region', # For classification 'efficiency' # For regression ] print("Available features:") for i, feature in enumerate(feature_columns, 1): print(f" {i:2d}. {feature}") print("\nAvailable targets:") for i, target in enumerate(target_columns, 1): print(f" {i:2d}. {target}") print("\nExample scikit-learn code:") print(""" import pandas as pd from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_squared_error, r2_score # Load dataframe df = pd.read_csv('path/to/dataframe.csv') # Prepare features and target features = ['arith_intensity', 'bandwidth_raw', 'flops_raw', 'efficiency'] X = df[features] y = df['performance_gflops'] # Split data X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Train model model = RandomForestRegressor(n_estimators=100, random_state=42) model.fit(X_train, y_train) # Evaluate y_pred = model.predict(X_test) print(f"R² Score: {r2_score(y_test, y_pred):.4f}") print(f"RMSE: {mean_squared_error(y_test, y_pred, squared=False):.4f}") """) if __name__ == "__main__": # Check if this is being run as an example if len(os.sys.argv) > 1 and os.sys.argv[1] == '--ml-example': create_ml_example() exit(0) # ... existing code ... def load_job_data_from_folder(job_path): """Load job data from folder""" job_path = Path(job_path) # Load data.json.gz data_file_path = job_path / "data.json.gz" if not data_file_path.exists(): raise FileNotFoundError(f"data.json.gz not found in {job_path}") with gzip.open(data_file_path, 'rt') as f: data = json.load(f) # Load meta.json meta_file_path = job_path / "meta.json" if not meta_file_path.exists(): raise FileNotFoundError(f"meta.json not found in {job_path}") with open(meta_file_path, 'r') as f: meta = json.load(f) return data, meta def load_cluster_config_from_folder(base_path, cluster_path="cluster.json"): """Load cluster configuration data from folder""" config_path = Path(base_path) / cluster_path if not config_path.exists(): raise FileNotFoundError(f"cluster.json not found at {config_path}") with open(config_path, 'r') as f: return json.load(f) def find_job_directories(base_path): """Find all job directories in the folder structure""" base_path = Path(base_path) job_dirs = [] # Look for directories matching pattern: {prefix}/{suffix}/{timestamp} for prefix_dir in base_path.iterdir(): if not prefix_dir.is_dir(): continue for suffix_dir in prefix_dir.iterdir(): if not suffix_dir.is_dir(): continue for timestamp_dir in suffix_dir.iterdir(): if not timestamp_dir.is_dir(): continue # Check if this directory contains both meta.json and data.json.gz if (timestamp_dir / "meta.json").exists() and (timestamp_dir / "data.json.gz").exists(): job_dirs.append(timestamp_dir) return job_dirs def load_done_list(done_file_path): """Load list of already processed job prefixes from done.txt""" done_file = Path(done_file_path) if not done_file.exists(): return set() with open(done_file, 'r') as f: return set(line.strip() for line in f if line.strip()) def save_done_list(done_file_path, done_set): """Save list of processed job prefixes to done.txt""" with open(done_file_path, 'w') as f: for prefix in sorted(done_set): f.write(f"{prefix}\n") def navigate_nested_dict(d, path): """Navigate a nested dictionary using a dot-separated path with array indices""" if not path: return d current = d parts = path.split('.') for part in parts: if '[' in part and ']' in part: # Handle explicit array access syntax (e.g., "node[0]") array_name = part.split('[')[0] index = int(part.split('[')[1].split(']')[0]) current = current[array_name][index] elif part.isdigit(): # Handle numeric indices for list access current = current[int(part)] else: # Handle regular dict access current = current[part] return current def get_subcluster_config(cluster_config, subcluster_name): """Get the configuration for a specific subcluster by name""" subclusters = cluster_config.get("subClusters", []) for i, subcluster in enumerate(subclusters): if subcluster.get("name") == subcluster_name: return i return 0 # Default to first subcluster if not found def get_performance_value(cluster_config, subcluster_idx, metric_path): """Extract a performance value with its proper unit prefix""" # Build the full path full_path = f"subClusters.{subcluster_idx}.{metric_path}" # Extract values value_parts = full_path.split('.') # Direct dictionary traversal instead of using navigate_nested_dict current = cluster_config for part in value_parts: if part.isdigit(): current = current[int(part)] else: current = current[part] value = current.get("value", 0) # Get prefix if available prefix = "" if "unit" in current and "prefix" in current["unit"]: prefix = current["unit"]["prefix"] # Apply prefix multiplier prefix_multipliers = { 'K': 1e3, 'M': 1e6, 'G': 1e9, 'T': 1e12, 'P': 1e15, '': 1.0 } multiplier = prefix_multipliers.get(prefix, 1.0) return value * multiplier def create_roofline_dataframe(data, meta, cluster_config, output_path=None, save_format='both'): """Create roofline dataframe with data points for ML training Args: data: Job performance data meta: Job metadata cluster_config: Cluster configuration output_path: Path to save the dataframe (without extension) save_format: Format to save ('csv', 'pickle', 'parquet', 'hdf5', 'both', 'compressed') """ # Get number of nodes and subcluster name num_nodes = meta['numNodes'] subcluster_name = meta.get('subCluster', 'main') # Default to 'main' if not specified # Get subcluster index from the cluster configuration subcluster_idx = get_subcluster_config(cluster_config, subcluster_name) # Get performance ceilings from cluster configuration scalar_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateScalar") simd_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateSimd") mem_bw = get_performance_value(cluster_config, subcluster_idx, "memoryBandwidth") print(f"=== Roofline DataFrame Configuration ===") print(f"Subcluster: {subcluster_name}") print(f"SIMD Peak Performance: {simd_flops/1e9:.2f} GFLOP/s") print(f"Scalar Peak Performance: {scalar_flops/1e9:.2f} GFLOP/s") print(f"Memory Bandwidth: {mem_bw/1e9:.2f} GB/s") print(f"Number of Nodes: {num_nodes}") # Convert to GFLOP/s for consistency scalar_peak_gflops = scalar_flops / 1e9 simd_peak_gflops = simd_flops / 1e9 mem_bw_bytes = mem_bw # Already in bytes/s from get_performance_value # Calculate knee points for rooflines scalar_knee_intensity = scalar_flops / mem_bw_bytes # FLOP/byte simd_knee_intensity = simd_flops / mem_bw_bytes # FLOP/byte print(f"Scalar knee intensity: {scalar_knee_intensity:.4f} FLOP/byte") print(f"SIMD knee intensity: {simd_knee_intensity:.4f} FLOP/byte") print(f"\n=== Data Points ===") print(f"{'Node':4} {'Timestep':8} {'Bandwidth (raw)':15} {'FLOPS (raw)':15} {'AI (FLOP/byte)':15}") print("-" * 75) # Initialize lists to collect data points data_points = [] # Statistics for data filtering total_data_points = 0 filtered_invalid = 0 filtered_above_roof = 0 filtered_out_of_bounds = 0 # Fixed bounds for consistency with original plotting x_min, x_max = 0.01, 1000.0 # FLOP/byte y_min, y_max = 1.0, 10000.0 # GFLOP/s for node_num in range(num_nodes): try: # Extract data series mem_bw_series = data.get("mem_bw", {}).get("node", {}).get("series", []) flops_series = data.get("flops_any", {}).get("node", {}).get("series", []) if node_num >= len(mem_bw_series) or node_num >= len(flops_series): continue mem_bw_data = mem_bw_series[node_num].get("data", []) flops_data = flops_series[node_num].get("data", []) # Ensure we have the same number of time steps for both metrics num_timesteps = min(len(mem_bw_data), len(flops_data)) if num_timesteps == 0: continue total_data_points += num_timesteps # Process each time step for t_idx in range(num_timesteps): # Get data values (assuming raw data is already in appropriate units) bw_raw = mem_bw_data[t_idx] # Memory bandwidth (raw value) flops_raw = flops_data[t_idx] # Performance in FLOP/s (raw value) # Skip if we have invalid data (None, NaN, or non-positive values) if (bw_raw is None or flops_raw is None or (isinstance(bw_raw, (int, float)) and (np.isnan(bw_raw) or np.isinf(bw_raw))) or (isinstance(flops_raw, (int, float)) and (np.isnan(flops_raw) or np.isinf(flops_raw))) or bw_raw <= 0 or flops_raw <= 0): filtered_invalid += 1 continue # For arithmetic intensity calculation, we need consistent units flops_gflops = flops_raw # Assume raw is already in GFLOP/s bw_gb_per_sec = bw_raw # Assume raw is in GB/s # Calculate AI in FLOP/byte: (GFLOP/s) / (GB/s) = FLOP/byte arith_intensity = flops_gflops / bw_gb_per_sec # Additional validation: check for reasonable ranges # Skip if arithmetic intensity is too extreme (likely measurement error) if arith_intensity < 1e-6 or arith_intensity > 1e6: filtered_invalid += 1 continue # Use raw FLOPS value for analysis (assume it's already in GFLOP/s) performance_gflops = flops_raw # Check if performance exceeds the theoretical maximum (above roofline) # Use the higher of SIMD or scalar peak performance as the ceiling max_theoretical_performance = max(simd_peak_gflops, scalar_peak_gflops) # Also check against memory bandwidth ceiling at this intensity mem_bw_ceiling = arith_intensity * (mem_bw_bytes / 1e9) # Convert to GFLOP/s effective_ceiling = min(max_theoretical_performance, mem_bw_ceiling) # Skip unrealistic data points that are significantly above the roofline # Allow for some measurement tolerance (e.g., 5% above theoretical max) if performance_gflops > effective_ceiling * 1.05: filtered_above_roof += 1 continue # Print key data points for monitoring if t_idx % 10 == 0: # Print every 10th point to avoid spam print(f"{node_num:4d} {t_idx:8d} {bw_raw:15.2f} {performance_gflops:15.2f} {arith_intensity:15.4f}") # Check if points are within our fixed bounds if (x_min <= arith_intensity <= x_max and y_min <= performance_gflops <= y_max): # Determine which roofline region this point falls into if arith_intensity <= min(scalar_knee_intensity, simd_knee_intensity): # Memory-bound region roofline_region = "memory_bound" theoretical_max = arith_intensity * (mem_bw_bytes / 1e9) elif arith_intensity <= max(scalar_knee_intensity, simd_knee_intensity): # Mixed region - determine which compute roof is limiting if scalar_knee_intensity <= arith_intensity <= simd_knee_intensity: roofline_region = "scalar_compute_bound" theoretical_max = scalar_peak_gflops else: roofline_region = "simd_compute_bound" theoretical_max = simd_peak_gflops else: # Compute-bound region roofline_region = "compute_bound" theoretical_max = max(simd_peak_gflops, scalar_peak_gflops) # Calculate efficiency relative to theoretical maximum efficiency = performance_gflops / theoretical_max if theoretical_max > 0 else 0 # Create data point dictionary data_point = { 'node_num': node_num, 'timestep': t_idx, 'bandwidth_raw': bw_raw, 'flops_raw': flops_raw, 'performance_gflops': performance_gflops, 'arith_intensity': arith_intensity, 'roofline_region': roofline_region, 'theoretical_max_gflops': theoretical_max, 'efficiency': efficiency, 'scalar_peak_gflops': scalar_peak_gflops, 'simd_peak_gflops': simd_peak_gflops, 'memory_bw_gbs': mem_bw_bytes / 1e9, 'scalar_knee_intensity': scalar_knee_intensity, 'simd_knee_intensity': simd_knee_intensity, 'subcluster_name': subcluster_name, 'job_id': meta.get('jobId', 'unknown'), 'duration': meta.get('duration', 0) } data_points.append(data_point) else: filtered_out_of_bounds += 1 except (KeyError, IndexError, ValueError) as e: print(f"Warning: Error processing data for node {node_num}: {e}") continue # Print filtering statistics valid_points = len(data_points) print(f"\n=== Data Filtering Statistics ===") print(f"Total data points processed: {total_data_points}") print(f"Filtered out - Invalid/None/NaN: {filtered_invalid}") print(f"Filtered out - Above roofline: {filtered_above_roof}") print(f"Filtered out - Out of bounds: {filtered_out_of_bounds}") print(f"Valid data points collected: {valid_points}") print(f"Data retention rate: {(valid_points/total_data_points)*100:.1f}%" if total_data_points > 0 else "No data points") if not data_points: print("\nNo valid data points found within the bounds - skipping dataframe creation") return False, None # Return False to indicate no dataframe was created # Create DataFrame from collected data points df = pd.DataFrame(data_points) # Add derived features useful for ML df['intensity_to_scalar_knee_ratio'] = df['arith_intensity'] / df['scalar_knee_intensity'] df['intensity_to_simd_knee_ratio'] = df['arith_intensity'] / df['simd_knee_intensity'] df['performance_to_scalar_peak_ratio'] = df['performance_gflops'] / df['scalar_peak_gflops'] df['performance_to_simd_peak_ratio'] = df['performance_gflops'] / df['simd_peak_gflops'] df['bandwidth_to_memory_bw_ratio'] = df['bandwidth_raw'] / df['memory_bw_gbs'] # Add categorical encodings for roofline regions df['is_memory_bound'] = (df['roofline_region'] == 'memory_bound').astype(int) df['is_scalar_compute_bound'] = (df['roofline_region'] == 'scalar_compute_bound').astype(int) df['is_simd_compute_bound'] = (df['roofline_region'] == 'simd_compute_bound').astype(int) df['is_compute_bound'] = (df['roofline_region'] == 'compute_bound').astype(int) print(f"\nCreated DataFrame with {len(df)} rows and {len(df.columns)} columns") print(f"Columns: {list(df.columns)}") # Save DataFrame if output path is provided if output_path: # Create output directory if it doesn't exist output_dir = Path(output_path).parent output_dir.mkdir(parents=True, exist_ok=True) saved_files = [] base_path = Path(output_path) if save_format in ['csv', 'both', 'compressed']: # Save as CSV if save_format == 'compressed': csv_path = base_path.with_suffix('.csv.gz') df.to_csv(csv_path, index=False, compression='gzip') print(f" Compressed CSV: {csv_path}") else: csv_path = base_path.with_suffix('.csv') df.to_csv(csv_path, index=False) print(f" CSV: {csv_path}") saved_files.append(str(csv_path)) if save_format in ['pickle', 'both', 'compressed']: # Save as pickle if save_format == 'compressed': pickle_path = base_path.with_suffix('.pkl.gz') df.to_pickle(pickle_path, compression='gzip') print(f" Compressed Pickle: {pickle_path}") else: pickle_path = base_path.with_suffix('.pkl') df.to_pickle(pickle_path) print(f" Pickle: {pickle_path}") saved_files.append(str(pickle_path)) if save_format == 'parquet': # Save as Parquet (compressed by default) try: parquet_path = base_path.with_suffix('.parquet') df.to_parquet(parquet_path, index=False) print(f" Parquet: {parquet_path}") saved_files.append(str(parquet_path)) except ImportError: print(" Warning: Parquet format requires 'pyarrow' or 'fastparquet'. Install with: pip install pyarrow") if save_format == 'hdf5': # Save as HDF5 try: hdf5_path = base_path.with_suffix('.h5') df.to_hdf(hdf5_path, key='dataframe', mode='w', index=False) print(f" HDF5: {hdf5_path}") saved_files.append(str(hdf5_path)) except ImportError: print(" Warning: HDF5 format requires 'tables'. Install with: pip install tables") if saved_files: print(f"\nSuccessfully saved dataframe in {save_format} format(s)") print(f"Files saved: {', '.join(saved_files)}") else: print(f"\nWarning: No files saved. Format '{save_format}' may not be supported or required packages missing.") return True, df # Return True and the dataframe else: print("\nDataFrame created but not saved (no output path provided)") return True, df # Return True and the dataframe def process_single_job(args): """Process a single job - designed for multiprocessing""" job_path, cluster_config, output_base_dir, save_format = args try: # Parse job directory structure: {prefix}/{suffix}/{timestamp} parts = job_path.parts if len(parts) < 3: return False, f"Invalid job path structure: {job_path}" job_id_prefix = parts[-3] job_id_suffix = parts[-2] timestamp = parts[-1] job_id = job_id_prefix + job_id_suffix # Load job data data, meta = load_job_data_from_folder(job_path) # Check job duration - skip jobs less than 60 seconds duration = meta.get('duration', 0) if duration < 60: return False, f"Skipped job {job_id}_{timestamp}: duration {duration}s < 60s" # Create output directory for this job prefix output_dir = Path(output_base_dir) / job_id_prefix output_dir.mkdir(parents=True, exist_ok=True) # Generate dataframe base_filename = f"{job_id}_{timestamp}" dataframe_path = output_dir / f"{base_filename}_dataframe" success, df = create_roofline_dataframe(data, meta, cluster_config, output_path=str(dataframe_path), save_format=save_format) # Check if dataframe was created successfully if success and df is not None: return True, f"Processed job {job_id}_{timestamp} (duration: {duration}s, {len(df)} data points)" else: return False, f"Skipped job {job_id}_{timestamp}: no valid data to create dataframe" except Exception as e: return False, f"Error processing job {job_path}: {e}" def process_all_jobs(base_path, output_base_dir, done_file_path="done.txt", num_processes=None, save_format='both'): """Process all jobs in the folder structure using multiprocessing""" print(f"Processing jobs from: {base_path}") print(f"Output directory: {output_base_dir}") print(f"Save format: {save_format}") # Load cluster configuration try: cluster_config = load_cluster_config_from_folder(base_path) print("Loaded cluster configuration") except FileNotFoundError as e: print(f"Error loading cluster config: {e}") return # Load already processed jobs done_set = load_done_list(done_file_path) print(f"Loaded {len(done_set)} already processed job prefixes from {done_file_path}") # Find all job directories job_dirs = find_job_directories(base_path) print(f"Found {len(job_dirs)} total job directories") # Filter out already processed jobs filtered_job_dirs = [] for job_path in job_dirs: parts = job_path.parts if len(parts) >= 3: job_id_prefix = parts[-3] if job_id_prefix not in done_set: filtered_job_dirs.append(job_path) print(f"Found {len(filtered_job_dirs)} unprocessed job directories") if not filtered_job_dirs: print("No new jobs to process") return # Determine number of processes if num_processes is None: num_processes = min(12, cpu_count()) # Use up to 12 cores or available cores print(f"Using {num_processes} processes for parallel processing") # Prepare arguments for multiprocessing process_args = [(job_path, cluster_config, output_base_dir, save_format) for job_path in filtered_job_dirs] processed_count = 0 skipped_count = 0 processed_prefixes = set() # Process jobs in parallel if num_processes == 1: # Single-threaded processing for debugging results = [process_single_job(args) for args in process_args] else: # Multi-threaded processing with Pool(num_processes) as pool: results = pool.map(process_single_job, process_args) # Collect results for i, (success, message) in enumerate(results): print(message) if success: processed_count += 1 # Extract job prefix for done tracking job_path = filtered_job_dirs[i] parts = job_path.parts if len(parts) >= 3: job_id_prefix = parts[-3] processed_prefixes.add(job_id_prefix) else: skipped_count += 1 # Update done list with newly processed prefixes if processed_prefixes: updated_done_set = done_set | processed_prefixes save_done_list(done_file_path, updated_done_set) print(f"Updated {done_file_path} with {len(processed_prefixes)} new processed prefixes") print(f"Completed processing {processed_count} jobs, skipped {skipped_count} jobs (duration < 60s or no valid data)") def main(): parser = argparse.ArgumentParser(description='Generate roofline dataframes from folder structure for ML training') parser.add_argument('--base-path', default=r'D:\fritz', help='Path to job archive folder') parser.add_argument('--output-dir', default='roofline_dataframes', help='Output directory for dataframes') parser.add_argument('--done-file', default='done.txt', help='File to track processed job prefixes') parser.add_argument('--processes', type=int, default=12, help='Number of parallel processes to use (default: 12)') parser.add_argument('--save-format', default='both', choices=['csv', 'pickle', 'both', 'compressed', 'parquet', 'hdf5'], help='Format to save dataframes (default: both)') args = parser.parse_args() try: process_all_jobs(args.base_path, args.output_dir, args.done_file, args.processes, args.save_format) except Exception as e: print(f"Error processing jobs: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()