import os import pandas as pd import numpy as np import sqlite3 import h5py from glob import glob from scipy import stats from pathlib import Path def compute_mad(data): """Compute Median Absolute Deviation: a robust measure of dispersion""" return np.median(np.abs(data - np.median(data))) def process_roofline_data(base_dir='D:/roofline_dataframes', output_file='D:/roofline_features.h5', job_tags_db='job_tags.db'): """ Process roofline data to extract features for machine learning. Args: base_dir: Directory containing roofline dataframes output_file: Path to save the output features job_tags_db: Path to SQLite database with job tags """ # Connect to the SQLite database conn = sqlite3.connect(job_tags_db) cursor = conn.cursor() # List to store all job features all_job_features = [] # Find all job prefix folders in the base directory job_prefixes = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))] for job_id_prefix in job_prefixes: job_prefix_path = os.path.join(base_dir, job_id_prefix) # Find all h5 files in this job prefix folder h5_files = glob(os.path.join(job_prefix_path, '*_dataframe.h5')) for h5_file in h5_files: filename = os.path.basename(h5_file) # Extract job_id_full from the filename pattern: {job_id_full}_{timestamp}_dataframe.h5 job_id_full = filename.split('_')[0] try: # Read the dataframe from the h5 file with h5py.File(h5_file, 'r') as f: if 'dataframe' not in f: print(f"Warning: No 'dataframe' key in {h5_file}") continue df = pd.read_hdf(h5_file, key='dataframe') # Group data by node_num grouped = df.groupby('node_num') for node_num, group in grouped: features = { 'job_id': job_id_full, 'node_num': node_num } # Compute statistics for key metrics for axis in ['bandwidth_raw', 'flops_raw', 'arith_intensity']: data = group[axis].values # Compute percentiles p10 = np.percentile(data, 10) p50 = np.median(data) p90 = np.percentile(data, 90) # Compute MAD (more robust than variance) mad = compute_mad(data) # Store features features[f'{axis}_p10'] = p10 features[f'{axis}_median'] = p50 features[f'{axis}_p90'] = p90 features[f'{axis}_mad'] = mad features[f'{axis}_range'] = p90 - p10 features[f'{axis}_iqr'] = np.percentile(data, 75) - np.percentile(data, 25) # Compute covariance and correlation between bandwidth_raw and flops_raw if len(group) > 1: # Need at least 2 points for correlation cov = np.cov(group['bandwidth_raw'], group['flops_raw'])[0, 1] features['bw_flops_covariance'] = cov corr, _ = stats.pearsonr(group['bandwidth_raw'], group['flops_raw']) features['bw_flops_correlation'] = corr # Additional useful features for the classifier # Performance metrics features['avg_performance_gflops'] = group['performance_gflops'].mean() features['median_performance_gflops'] = group['performance_gflops'].median() features['performance_gflops_mad'] = compute_mad(group['performance_gflops'].values) # # Efficiency metrics # features['avg_efficiency'] = group['efficiency'].mean() # features['median_efficiency'] = group['efficiency'].median() # features['efficiency_mad'] = compute_mad(group['efficiency'].values) # features['efficiency_p10'] = np.percentile(group['efficiency'].values, 10) # features['efficiency_p90'] = np.percentile(group['efficiency'].values, 90) # # Distribution of roofline regions (memory-bound vs compute-bound) # if 'roofline_region' in group.columns: # region_counts = group['roofline_region'].value_counts(normalize=True).to_dict() # for region, ratio in region_counts.items(): # features[f'region_{region}_ratio'] = ratio # System characteristics if 'memory_bw_gbs' in group.columns: features['avg_memory_bw_gbs'] = group['memory_bw_gbs'].mean() if 'scalar_peak_gflops' in group.columns and len(group['scalar_peak_gflops'].unique()) > 0: features['scalar_peak_gflops'] = group['scalar_peak_gflops'].iloc[0] if 'simd_peak_gflops' in group.columns and len(group['simd_peak_gflops'].unique()) > 0: features['simd_peak_gflops'] = group['simd_peak_gflops'].iloc[0] # # Subcluster information if available # if 'subcluster_name' in group.columns and not group['subcluster_name'].isna().all(): # features['subcluster_name'] = group['subcluster_name'].iloc[0] # Duration information if 'duration' in group.columns: features['duration'] = group['duration'].iloc[0] # Get the label (application type) from the database cursor.execute("SELECT tags FROM job_tags WHERE job_id = ?", (int(job_id_full),)) result = cursor.fetchone() if result: # Extract application name from tags tags = result[0] features['label'] = tags else: features['label'] = 'Unknown' all_job_features.append(features) except Exception as e: print(f"Error processing file {h5_file}: {e}") # Close database connection conn.close() if not all_job_features: print("No features extracted. Check if files exist and have the correct format.") return # Convert to DataFrame features_df = pd.DataFrame(all_job_features) # Fill missing roofline region ratios with 0 region_columns = [col for col in features_df.columns if col.startswith('region_')] for col in region_columns: if col not in features_df.columns: features_df[col] = 0 else: features_df[col] = features_df[col].fillna(0) # Save to H5 file features_df.to_hdf(output_file, key='features', mode='w') print(f"Processed {len(all_job_features)} job-node combinations") print(f"Features saved to {output_file}") print(f"Feature columns: {', '.join(features_df.columns)}") return features_df if __name__ == "__main__": process_roofline_data()