diff --git a/feature_aggregator.py b/feature_aggregator.py new file mode 100644 index 0000000..aceeae8 --- /dev/null +++ b/feature_aggregator.py @@ -0,0 +1,172 @@ +import os +import pandas as pd +import numpy as np +import sqlite3 +import h5py +from glob import glob +from scipy import stats +from pathlib import Path + +def compute_mad(data): + """Compute Median Absolute Deviation: a robust measure of dispersion""" + return np.median(np.abs(data - np.median(data))) + +def process_roofline_data(base_dir='D:/roofline_dataframes', + output_file='D:/roofline_features.h5', + job_tags_db='job_tags.db'): + """ + Process roofline data to extract features for machine learning. + + Args: + base_dir: Directory containing roofline dataframes + output_file: Path to save the output features + job_tags_db: Path to SQLite database with job tags + """ + # Connect to the SQLite database + conn = sqlite3.connect(job_tags_db) + cursor = conn.cursor() + + # List to store all job features + all_job_features = [] + + # Find all job prefix folders in the base directory + job_prefixes = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))] + + for job_id_prefix in job_prefixes: + job_prefix_path = os.path.join(base_dir, job_id_prefix) + + # Find all h5 files in this job prefix folder + h5_files = glob(os.path.join(job_prefix_path, '*_dataframe.h5')) + + for h5_file in h5_files: + filename = os.path.basename(h5_file) + # Extract job_id_full from the filename pattern: {job_id_full}_{timestamp}_dataframe.h5 + job_id_full = filename.split('_')[0] + + try: + # Read the dataframe from the h5 file + with h5py.File(h5_file, 'r') as f: + if 'dataframe' not in f: + print(f"Warning: No 'dataframe' key in {h5_file}") + continue + + df = pd.read_hdf(h5_file, key='dataframe') + + # Group data by node_num + grouped = df.groupby('node_num') + + for node_num, group in grouped: + features = { + 'job_id': job_id_full, + 'node_num': node_num + } + + # Compute statistics for key metrics + for axis in ['bandwidth_raw', 'flops_raw', 'arith_intensity']: + data = group[axis].values + + # Compute percentiles + p10 = np.percentile(data, 10) + p50 = np.median(data) + p90 = np.percentile(data, 90) + + # Compute MAD (more robust than variance) + mad = compute_mad(data) + + # Store features + features[f'{axis}_p10'] = p10 + features[f'{axis}_median'] = p50 + features[f'{axis}_p90'] = p90 + features[f'{axis}_mad'] = mad + features[f'{axis}_range'] = p90 - p10 + features[f'{axis}_iqr'] = np.percentile(data, 75) - np.percentile(data, 25) + + # Compute covariance and correlation between bandwidth_raw and flops_raw + if len(group) > 1: # Need at least 2 points for correlation + cov = np.cov(group['bandwidth_raw'], group['flops_raw'])[0, 1] + features['bw_flops_covariance'] = cov + + corr, _ = stats.pearsonr(group['bandwidth_raw'], group['flops_raw']) + features['bw_flops_correlation'] = corr + + # Additional useful features for the classifier + + # Performance metrics + features['avg_performance_gflops'] = group['performance_gflops'].mean() + features['median_performance_gflops'] = group['performance_gflops'].median() + features['performance_gflops_mad'] = compute_mad(group['performance_gflops'].values) + + # # Efficiency metrics + # features['avg_efficiency'] = group['efficiency'].mean() + # features['median_efficiency'] = group['efficiency'].median() + # features['efficiency_mad'] = compute_mad(group['efficiency'].values) + # features['efficiency_p10'] = np.percentile(group['efficiency'].values, 10) + # features['efficiency_p90'] = np.percentile(group['efficiency'].values, 90) + + # # Distribution of roofline regions (memory-bound vs compute-bound) + # if 'roofline_region' in group.columns: + # region_counts = group['roofline_region'].value_counts(normalize=True).to_dict() + # for region, ratio in region_counts.items(): + # features[f'region_{region}_ratio'] = ratio + + # System characteristics + if 'memory_bw_gbs' in group.columns: + features['avg_memory_bw_gbs'] = group['memory_bw_gbs'].mean() + if 'scalar_peak_gflops' in group.columns and len(group['scalar_peak_gflops'].unique()) > 0: + features['scalar_peak_gflops'] = group['scalar_peak_gflops'].iloc[0] + if 'simd_peak_gflops' in group.columns and len(group['simd_peak_gflops'].unique()) > 0: + features['simd_peak_gflops'] = group['simd_peak_gflops'].iloc[0] + + # # Subcluster information if available + # if 'subcluster_name' in group.columns and not group['subcluster_name'].isna().all(): + # features['subcluster_name'] = group['subcluster_name'].iloc[0] + + # Duration information + if 'duration' in group.columns: + features['duration'] = group['duration'].iloc[0] + + # Get the label (application type) from the database + cursor.execute("SELECT tags FROM job_tags WHERE job_id = ?", (int(job_id_full),)) + result = cursor.fetchone() + + if result: + # Extract application name from tags + tags = result[0] + features['label'] = tags + else: + features['label'] = 'Unknown' + + all_job_features.append(features) + + except Exception as e: + print(f"Error processing file {h5_file}: {e}") + + # Close database connection + conn.close() + + if not all_job_features: + print("No features extracted. Check if files exist and have the correct format.") + return + + # Convert to DataFrame + features_df = pd.DataFrame(all_job_features) + + # Fill missing roofline region ratios with 0 + region_columns = [col for col in features_df.columns if col.startswith('region_')] + for col in region_columns: + if col not in features_df.columns: + features_df[col] = 0 + else: + features_df[col] = features_df[col].fillna(0) + + # Save to H5 file + features_df.to_hdf(output_file, key='features', mode='w') + + print(f"Processed {len(all_job_features)} job-node combinations") + print(f"Features saved to {output_file}") + print(f"Feature columns: {', '.join(features_df.columns)}") + + return features_df + +if __name__ == "__main__": + process_roofline_data() \ No newline at end of file