diff --git a/roofline_dataframe_generator.py b/roofline_dataframe_generator.py new file mode 100644 index 0000000..cc5378b --- /dev/null +++ b/roofline_dataframe_generator.py @@ -0,0 +1,646 @@ +#!/usr/bin/env python3 +import os +import gzip +import json +import argparse +from pathlib import Path +import pandas as pd +import numpy as np +from datetime import datetime +from multiprocessing import Pool, cpu_count +import functools + +def create_ml_example(): + """ + Example function showing how to use the generated dataframes for ML training + """ + print("=== Machine Learning Example with Roofline Data ===") + print("This is an example of how to use the generated dataframes for training ML models.") + print("The dataframes contain features suitable for various ML tasks:") + print("- Regression: Predict performance (performance_gflops)") + print("- Classification: Predict roofline region (roofline_region)") + print("- Clustering: Group similar performance patterns") + print() + + # Example feature columns available in the dataframes + feature_columns = [ + 'arith_intensity', 'bandwidth_raw', 'flops_raw', + 'intensity_to_scalar_knee_ratio', 'intensity_to_simd_knee_ratio', + 'performance_to_scalar_peak_ratio', 'performance_to_simd_peak_ratio', + 'bandwidth_to_memory_bw_ratio', 'efficiency', + 'is_memory_bound', 'is_scalar_compute_bound', + 'is_simd_compute_bound', 'is_compute_bound' + ] + + target_columns = [ + 'performance_gflops', # For regression + 'roofline_region', # For classification + 'efficiency' # For regression + ] + + print("Available features:") + for i, feature in enumerate(feature_columns, 1): + print(f" {i:2d}. {feature}") + + print("\nAvailable targets:") + for i, target in enumerate(target_columns, 1): + print(f" {i:2d}. {target}") + + print("\nExample scikit-learn code:") + print(""" +import pandas as pd +from sklearn.model_selection import train_test_split +from sklearn.ensemble import RandomForestRegressor +from sklearn.metrics import mean_squared_error, r2_score + +# Load dataframe +df = pd.read_csv('path/to/dataframe.csv') + +# Prepare features and target +features = ['arith_intensity', 'bandwidth_raw', 'flops_raw', 'efficiency'] +X = df[features] +y = df['performance_gflops'] + +# Split data +X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) + +# Train model +model = RandomForestRegressor(n_estimators=100, random_state=42) +model.fit(X_train, y_train) + +# Evaluate +y_pred = model.predict(X_test) +print(f"R² Score: {r2_score(y_test, y_pred):.4f}") +print(f"RMSE: {mean_squared_error(y_test, y_pred, squared=False):.4f}") +""") + +if __name__ == "__main__": + # Check if this is being run as an example + if len(os.sys.argv) > 1 and os.sys.argv[1] == '--ml-example': + create_ml_example() + exit(0) + +# ... existing code ... + +def load_job_data_from_folder(job_path): + """Load job data from folder""" + job_path = Path(job_path) + + # Load data.json.gz + data_file_path = job_path / "data.json.gz" + if not data_file_path.exists(): + raise FileNotFoundError(f"data.json.gz not found in {job_path}") + + with gzip.open(data_file_path, 'rt') as f: + data = json.load(f) + + # Load meta.json + meta_file_path = job_path / "meta.json" + if not meta_file_path.exists(): + raise FileNotFoundError(f"meta.json not found in {job_path}") + + with open(meta_file_path, 'r') as f: + meta = json.load(f) + + return data, meta + +def load_cluster_config_from_folder(base_path, cluster_path="cluster.json"): + """Load cluster configuration data from folder""" + config_path = Path(base_path) / cluster_path + if not config_path.exists(): + raise FileNotFoundError(f"cluster.json not found at {config_path}") + + with open(config_path, 'r') as f: + return json.load(f) + +def find_job_directories(base_path): + """Find all job directories in the folder structure""" + base_path = Path(base_path) + job_dirs = [] + + # Look for directories matching pattern: {prefix}/{suffix}/{timestamp} + for prefix_dir in base_path.iterdir(): + if not prefix_dir.is_dir(): + continue + for suffix_dir in prefix_dir.iterdir(): + if not suffix_dir.is_dir(): + continue + for timestamp_dir in suffix_dir.iterdir(): + if not timestamp_dir.is_dir(): + continue + # Check if this directory contains both meta.json and data.json.gz + if (timestamp_dir / "meta.json").exists() and (timestamp_dir / "data.json.gz").exists(): + job_dirs.append(timestamp_dir) + + return job_dirs + +def load_done_list(done_file_path): + """Load list of already processed job prefixes from done.txt""" + done_file = Path(done_file_path) + if not done_file.exists(): + return set() + + with open(done_file, 'r') as f: + return set(line.strip() for line in f if line.strip()) + +def save_done_list(done_file_path, done_set): + """Save list of processed job prefixes to done.txt""" + with open(done_file_path, 'w') as f: + for prefix in sorted(done_set): + f.write(f"{prefix}\n") + +def navigate_nested_dict(d, path): + """Navigate a nested dictionary using a dot-separated path with array indices""" + if not path: + return d + + current = d + parts = path.split('.') + + for part in parts: + if '[' in part and ']' in part: + # Handle explicit array access syntax (e.g., "node[0]") + array_name = part.split('[')[0] + index = int(part.split('[')[1].split(']')[0]) + current = current[array_name][index] + elif part.isdigit(): + # Handle numeric indices for list access + current = current[int(part)] + else: + # Handle regular dict access + current = current[part] + + return current + +def get_subcluster_config(cluster_config, subcluster_name): + """Get the configuration for a specific subcluster by name""" + subclusters = cluster_config.get("subClusters", []) + + for i, subcluster in enumerate(subclusters): + if subcluster.get("name") == subcluster_name: + return i + + return 0 # Default to first subcluster if not found + +def get_performance_value(cluster_config, subcluster_idx, metric_path): + """Extract a performance value with its proper unit prefix""" + # Build the full path + full_path = f"subClusters.{subcluster_idx}.{metric_path}" + + # Extract values + value_parts = full_path.split('.') + + # Direct dictionary traversal instead of using navigate_nested_dict + current = cluster_config + for part in value_parts: + if part.isdigit(): + current = current[int(part)] + else: + current = current[part] + + value = current.get("value", 0) + + # Get prefix if available + prefix = "" + if "unit" in current and "prefix" in current["unit"]: + prefix = current["unit"]["prefix"] + + # Apply prefix multiplier + prefix_multipliers = { + 'K': 1e3, + 'M': 1e6, + 'G': 1e9, + 'T': 1e12, + 'P': 1e15, + '': 1.0 + } + + multiplier = prefix_multipliers.get(prefix, 1.0) + return value * multiplier + +def create_roofline_dataframe(data, meta, cluster_config, output_path=None, save_format='both'): + """Create roofline dataframe with data points for ML training + + Args: + data: Job performance data + meta: Job metadata + cluster_config: Cluster configuration + output_path: Path to save the dataframe (without extension) + save_format: Format to save ('csv', 'pickle', 'parquet', 'hdf5', 'both', 'compressed') + """ + # Get number of nodes and subcluster name + num_nodes = meta['numNodes'] + subcluster_name = meta.get('subCluster', 'main') # Default to 'main' if not specified + + # Get subcluster index from the cluster configuration + subcluster_idx = get_subcluster_config(cluster_config, subcluster_name) + + # Get performance ceilings from cluster configuration + scalar_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateScalar") + simd_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateSimd") + mem_bw = get_performance_value(cluster_config, subcluster_idx, "memoryBandwidth") + + print(f"=== Roofline DataFrame Configuration ===") + print(f"Subcluster: {subcluster_name}") + print(f"SIMD Peak Performance: {simd_flops/1e9:.2f} GFLOP/s") + print(f"Scalar Peak Performance: {scalar_flops/1e9:.2f} GFLOP/s") + print(f"Memory Bandwidth: {mem_bw/1e9:.2f} GB/s") + print(f"Number of Nodes: {num_nodes}") + + # Convert to GFLOP/s for consistency + scalar_peak_gflops = scalar_flops / 1e9 + simd_peak_gflops = simd_flops / 1e9 + mem_bw_bytes = mem_bw # Already in bytes/s from get_performance_value + + # Calculate knee points for rooflines + scalar_knee_intensity = scalar_flops / mem_bw_bytes # FLOP/byte + simd_knee_intensity = simd_flops / mem_bw_bytes # FLOP/byte + + print(f"Scalar knee intensity: {scalar_knee_intensity:.4f} FLOP/byte") + print(f"SIMD knee intensity: {simd_knee_intensity:.4f} FLOP/byte") + + print(f"\n=== Data Points ===") + print(f"{'Node':4} {'Timestep':8} {'Bandwidth (raw)':15} {'FLOPS (raw)':15} {'AI (FLOP/byte)':15}") + print("-" * 75) + + # Initialize lists to collect data points + data_points = [] + + # Statistics for data filtering + total_data_points = 0 + filtered_invalid = 0 + filtered_above_roof = 0 + filtered_out_of_bounds = 0 + + # Fixed bounds for consistency with original plotting + x_min, x_max = 0.01, 1000.0 # FLOP/byte + y_min, y_max = 1.0, 10000.0 # GFLOP/s + + for node_num in range(num_nodes): + try: + # Extract data series + mem_bw_series = data.get("mem_bw", {}).get("node", {}).get("series", []) + flops_series = data.get("flops_any", {}).get("node", {}).get("series", []) + + if node_num >= len(mem_bw_series) or node_num >= len(flops_series): + continue + + mem_bw_data = mem_bw_series[node_num].get("data", []) + flops_data = flops_series[node_num].get("data", []) + + # Ensure we have the same number of time steps for both metrics + num_timesteps = min(len(mem_bw_data), len(flops_data)) + if num_timesteps == 0: + continue + + total_data_points += num_timesteps + + # Process each time step + for t_idx in range(num_timesteps): + # Get data values (assuming raw data is already in appropriate units) + bw_raw = mem_bw_data[t_idx] # Memory bandwidth (raw value) + flops_raw = flops_data[t_idx] # Performance in FLOP/s (raw value) + + # Skip if we have invalid data (None, NaN, or non-positive values) + if (bw_raw is None or flops_raw is None or + (isinstance(bw_raw, (int, float)) and (np.isnan(bw_raw) or np.isinf(bw_raw))) or + (isinstance(flops_raw, (int, float)) and (np.isnan(flops_raw) or np.isinf(flops_raw))) or + bw_raw <= 0 or flops_raw <= 0): + filtered_invalid += 1 + continue + + # For arithmetic intensity calculation, we need consistent units + flops_gflops = flops_raw # Assume raw is already in GFLOP/s + bw_gb_per_sec = bw_raw # Assume raw is in GB/s + + # Calculate AI in FLOP/byte: (GFLOP/s) / (GB/s) = FLOP/byte + arith_intensity = flops_gflops / bw_gb_per_sec + + # Additional validation: check for reasonable ranges + # Skip if arithmetic intensity is too extreme (likely measurement error) + if arith_intensity < 1e-6 or arith_intensity > 1e6: + filtered_invalid += 1 + continue + + # Use raw FLOPS value for analysis (assume it's already in GFLOP/s) + performance_gflops = flops_raw + + # Check if performance exceeds the theoretical maximum (above roofline) + # Use the higher of SIMD or scalar peak performance as the ceiling + max_theoretical_performance = max(simd_peak_gflops, scalar_peak_gflops) + + # Also check against memory bandwidth ceiling at this intensity + mem_bw_ceiling = arith_intensity * (mem_bw_bytes / 1e9) # Convert to GFLOP/s + effective_ceiling = min(max_theoretical_performance, mem_bw_ceiling) + + # Skip unrealistic data points that are significantly above the roofline + # Allow for some measurement tolerance (e.g., 5% above theoretical max) + if performance_gflops > effective_ceiling * 1.05: + filtered_above_roof += 1 + continue + + # Print key data points for monitoring + if t_idx % 10 == 0: # Print every 10th point to avoid spam + print(f"{node_num:4d} {t_idx:8d} {bw_raw:15.2f} {performance_gflops:15.2f} {arith_intensity:15.4f}") + + # Check if points are within our fixed bounds + if (x_min <= arith_intensity <= x_max and + y_min <= performance_gflops <= y_max): + + # Determine which roofline region this point falls into + if arith_intensity <= min(scalar_knee_intensity, simd_knee_intensity): + # Memory-bound region + roofline_region = "memory_bound" + theoretical_max = arith_intensity * (mem_bw_bytes / 1e9) + elif arith_intensity <= max(scalar_knee_intensity, simd_knee_intensity): + # Mixed region - determine which compute roof is limiting + if scalar_knee_intensity <= arith_intensity <= simd_knee_intensity: + roofline_region = "scalar_compute_bound" + theoretical_max = scalar_peak_gflops + else: + roofline_region = "simd_compute_bound" + theoretical_max = simd_peak_gflops + else: + # Compute-bound region + roofline_region = "compute_bound" + theoretical_max = max(simd_peak_gflops, scalar_peak_gflops) + + # Calculate efficiency relative to theoretical maximum + efficiency = performance_gflops / theoretical_max if theoretical_max > 0 else 0 + + # Create data point dictionary + data_point = { + 'node_num': node_num, + 'timestep': t_idx, + 'bandwidth_raw': bw_raw, + 'flops_raw': flops_raw, + 'performance_gflops': performance_gflops, + 'arith_intensity': arith_intensity, + 'roofline_region': roofline_region, + 'theoretical_max_gflops': theoretical_max, + 'efficiency': efficiency, + 'scalar_peak_gflops': scalar_peak_gflops, + 'simd_peak_gflops': simd_peak_gflops, + 'memory_bw_gbs': mem_bw_bytes / 1e9, + 'scalar_knee_intensity': scalar_knee_intensity, + 'simd_knee_intensity': simd_knee_intensity, + 'subcluster_name': subcluster_name, + 'job_id': meta.get('jobId', 'unknown'), + 'duration': meta.get('duration', 0) + } + + data_points.append(data_point) + else: + filtered_out_of_bounds += 1 + + except (KeyError, IndexError, ValueError) as e: + print(f"Warning: Error processing data for node {node_num}: {e}") + continue + + # Print filtering statistics + valid_points = len(data_points) + print(f"\n=== Data Filtering Statistics ===") + print(f"Total data points processed: {total_data_points}") + print(f"Filtered out - Invalid/None/NaN: {filtered_invalid}") + print(f"Filtered out - Above roofline: {filtered_above_roof}") + print(f"Filtered out - Out of bounds: {filtered_out_of_bounds}") + print(f"Valid data points collected: {valid_points}") + print(f"Data retention rate: {(valid_points/total_data_points)*100:.1f}%" if total_data_points > 0 else "No data points") + + if not data_points: + print("\nNo valid data points found within the bounds - skipping dataframe creation") + return False, None # Return False to indicate no dataframe was created + + # Create DataFrame from collected data points + df = pd.DataFrame(data_points) + + # Add derived features useful for ML + df['intensity_to_scalar_knee_ratio'] = df['arith_intensity'] / df['scalar_knee_intensity'] + df['intensity_to_simd_knee_ratio'] = df['arith_intensity'] / df['simd_knee_intensity'] + df['performance_to_scalar_peak_ratio'] = df['performance_gflops'] / df['scalar_peak_gflops'] + df['performance_to_simd_peak_ratio'] = df['performance_gflops'] / df['simd_peak_gflops'] + df['bandwidth_to_memory_bw_ratio'] = df['bandwidth_raw'] / df['memory_bw_gbs'] + + # Add categorical encodings for roofline regions + df['is_memory_bound'] = (df['roofline_region'] == 'memory_bound').astype(int) + df['is_scalar_compute_bound'] = (df['roofline_region'] == 'scalar_compute_bound').astype(int) + df['is_simd_compute_bound'] = (df['roofline_region'] == 'simd_compute_bound').astype(int) + df['is_compute_bound'] = (df['roofline_region'] == 'compute_bound').astype(int) + + print(f"\nCreated DataFrame with {len(df)} rows and {len(df.columns)} columns") + print(f"Columns: {list(df.columns)}") + + # Save DataFrame if output path is provided + if output_path: + # Create output directory if it doesn't exist + output_dir = Path(output_path).parent + output_dir.mkdir(parents=True, exist_ok=True) + + saved_files = [] + base_path = Path(output_path) + + if save_format in ['csv', 'both', 'compressed']: + # Save as CSV + if save_format == 'compressed': + csv_path = base_path.with_suffix('.csv.gz') + df.to_csv(csv_path, index=False, compression='gzip') + print(f" Compressed CSV: {csv_path}") + else: + csv_path = base_path.with_suffix('.csv') + df.to_csv(csv_path, index=False) + print(f" CSV: {csv_path}") + saved_files.append(str(csv_path)) + + if save_format in ['pickle', 'both', 'compressed']: + # Save as pickle + if save_format == 'compressed': + pickle_path = base_path.with_suffix('.pkl.gz') + df.to_pickle(pickle_path, compression='gzip') + print(f" Compressed Pickle: {pickle_path}") + else: + pickle_path = base_path.with_suffix('.pkl') + df.to_pickle(pickle_path) + print(f" Pickle: {pickle_path}") + saved_files.append(str(pickle_path)) + + if save_format == 'parquet': + # Save as Parquet (compressed by default) + try: + parquet_path = base_path.with_suffix('.parquet') + df.to_parquet(parquet_path, index=False) + print(f" Parquet: {parquet_path}") + saved_files.append(str(parquet_path)) + except ImportError: + print(" Warning: Parquet format requires 'pyarrow' or 'fastparquet'. Install with: pip install pyarrow") + + if save_format == 'hdf5': + # Save as HDF5 + try: + hdf5_path = base_path.with_suffix('.h5') + df.to_hdf(hdf5_path, key='dataframe', mode='w', index=False) + print(f" HDF5: {hdf5_path}") + saved_files.append(str(hdf5_path)) + except ImportError: + print(" Warning: HDF5 format requires 'tables'. Install with: pip install tables") + + if saved_files: + print(f"\nSuccessfully saved dataframe in {save_format} format(s)") + print(f"Files saved: {', '.join(saved_files)}") + else: + print(f"\nWarning: No files saved. Format '{save_format}' may not be supported or required packages missing.") + + return True, df # Return True and the dataframe + else: + print("\nDataFrame created but not saved (no output path provided)") + return True, df # Return True and the dataframe + +def process_single_job(args): + """Process a single job - designed for multiprocessing""" + job_path, cluster_config, output_base_dir, save_format = args + + try: + # Parse job directory structure: {prefix}/{suffix}/{timestamp} + parts = job_path.parts + if len(parts) < 3: + return False, f"Invalid job path structure: {job_path}" + + job_id_prefix = parts[-3] + job_id_suffix = parts[-2] + timestamp = parts[-1] + job_id = job_id_prefix + job_id_suffix + + # Load job data + data, meta = load_job_data_from_folder(job_path) + + # Check job duration - skip jobs less than 60 seconds + duration = meta.get('duration', 0) + if duration < 60: + return False, f"Skipped job {job_id}_{timestamp}: duration {duration}s < 60s" + + # Create output directory for this job prefix + output_dir = Path(output_base_dir) / job_id_prefix + output_dir.mkdir(parents=True, exist_ok=True) + + # Generate dataframe + base_filename = f"{job_id}_{timestamp}" + dataframe_path = output_dir / f"{base_filename}_dataframe" + + success, df = create_roofline_dataframe(data, meta, cluster_config, + output_path=str(dataframe_path), save_format=save_format) + + # Check if dataframe was created successfully + if success and df is not None: + return True, f"Processed job {job_id}_{timestamp} (duration: {duration}s, {len(df)} data points)" + else: + return False, f"Skipped job {job_id}_{timestamp}: no valid data to create dataframe" + + except Exception as e: + return False, f"Error processing job {job_path}: {e}" + +def process_all_jobs(base_path, output_base_dir, done_file_path="done.txt", num_processes=None, save_format='both'): + """Process all jobs in the folder structure using multiprocessing""" + print(f"Processing jobs from: {base_path}") + print(f"Output directory: {output_base_dir}") + print(f"Save format: {save_format}") + + # Load cluster configuration + try: + cluster_config = load_cluster_config_from_folder(base_path) + print("Loaded cluster configuration") + except FileNotFoundError as e: + print(f"Error loading cluster config: {e}") + return + + # Load already processed jobs + done_set = load_done_list(done_file_path) + print(f"Loaded {len(done_set)} already processed job prefixes from {done_file_path}") + + # Find all job directories + job_dirs = find_job_directories(base_path) + print(f"Found {len(job_dirs)} total job directories") + + # Filter out already processed jobs + filtered_job_dirs = [] + for job_path in job_dirs: + parts = job_path.parts + if len(parts) >= 3: + job_id_prefix = parts[-3] + if job_id_prefix not in done_set: + filtered_job_dirs.append(job_path) + + print(f"Found {len(filtered_job_dirs)} unprocessed job directories") + + if not filtered_job_dirs: + print("No new jobs to process") + return + + # Determine number of processes + if num_processes is None: + num_processes = min(12, cpu_count()) # Use up to 12 cores or available cores + + print(f"Using {num_processes} processes for parallel processing") + + # Prepare arguments for multiprocessing + process_args = [(job_path, cluster_config, output_base_dir, save_format) for job_path in filtered_job_dirs] + + processed_count = 0 + skipped_count = 0 + processed_prefixes = set() + + # Process jobs in parallel + if num_processes == 1: + # Single-threaded processing for debugging + results = [process_single_job(args) for args in process_args] + else: + # Multi-threaded processing + with Pool(num_processes) as pool: + results = pool.map(process_single_job, process_args) + + # Collect results + for i, (success, message) in enumerate(results): + print(message) + if success: + processed_count += 1 + # Extract job prefix for done tracking + job_path = filtered_job_dirs[i] + parts = job_path.parts + if len(parts) >= 3: + job_id_prefix = parts[-3] + processed_prefixes.add(job_id_prefix) + else: + skipped_count += 1 + + # Update done list with newly processed prefixes + if processed_prefixes: + updated_done_set = done_set | processed_prefixes + save_done_list(done_file_path, updated_done_set) + print(f"Updated {done_file_path} with {len(processed_prefixes)} new processed prefixes") + + print(f"Completed processing {processed_count} jobs, skipped {skipped_count} jobs (duration < 60s or no valid data)") + +def main(): + parser = argparse.ArgumentParser(description='Generate roofline dataframes from folder structure for ML training') + parser.add_argument('--base-path', default=r'D:\fritz', + help='Path to job archive folder') + parser.add_argument('--output-dir', default='roofline_dataframes', + help='Output directory for dataframes') + parser.add_argument('--done-file', default='done.txt', + help='File to track processed job prefixes') + parser.add_argument('--processes', type=int, default=12, + help='Number of parallel processes to use (default: 12)') + parser.add_argument('--save-format', default='both', + choices=['csv', 'pickle', 'both', 'compressed', 'parquet', 'hdf5'], + help='Format to save dataframes (default: both)') + + args = parser.parse_args() + + try: + process_all_jobs(args.base_path, args.output_dir, args.done_file, args.processes, args.save_format) + + except Exception as e: + print(f"Error processing jobs: {e}") + import traceback + traceback.print_exc() + +if __name__ == "__main__": + main() \ No newline at end of file