Add feature_aggregator.py

This commit is contained in:
2025-10-14 15:02:50 +02:00
parent 31bee4a1fe
commit 4adda57539

172
feature_aggregator.py Normal file
View File

@@ -0,0 +1,172 @@
import os
import pandas as pd
import numpy as np
import sqlite3
import h5py
from glob import glob
from scipy import stats
from pathlib import Path
def compute_mad(data):
"""Compute Median Absolute Deviation: a robust measure of dispersion"""
return np.median(np.abs(data - np.median(data)))
def process_roofline_data(base_dir='D:/roofline_dataframes',
output_file='D:/roofline_features.h5',
job_tags_db='job_tags.db'):
"""
Process roofline data to extract features for machine learning.
Args:
base_dir: Directory containing roofline dataframes
output_file: Path to save the output features
job_tags_db: Path to SQLite database with job tags
"""
# Connect to the SQLite database
conn = sqlite3.connect(job_tags_db)
cursor = conn.cursor()
# List to store all job features
all_job_features = []
# Find all job prefix folders in the base directory
job_prefixes = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]
for job_id_prefix in job_prefixes:
job_prefix_path = os.path.join(base_dir, job_id_prefix)
# Find all h5 files in this job prefix folder
h5_files = glob(os.path.join(job_prefix_path, '*_dataframe.h5'))
for h5_file in h5_files:
filename = os.path.basename(h5_file)
# Extract job_id_full from the filename pattern: {job_id_full}_{timestamp}_dataframe.h5
job_id_full = filename.split('_')[0]
try:
# Read the dataframe from the h5 file
with h5py.File(h5_file, 'r') as f:
if 'dataframe' not in f:
print(f"Warning: No 'dataframe' key in {h5_file}")
continue
df = pd.read_hdf(h5_file, key='dataframe')
# Group data by node_num
grouped = df.groupby('node_num')
for node_num, group in grouped:
features = {
'job_id': job_id_full,
'node_num': node_num
}
# Compute statistics for key metrics
for axis in ['bandwidth_raw', 'flops_raw', 'arith_intensity']:
data = group[axis].values
# Compute percentiles
p10 = np.percentile(data, 10)
p50 = np.median(data)
p90 = np.percentile(data, 90)
# Compute MAD (more robust than variance)
mad = compute_mad(data)
# Store features
features[f'{axis}_p10'] = p10
features[f'{axis}_median'] = p50
features[f'{axis}_p90'] = p90
features[f'{axis}_mad'] = mad
features[f'{axis}_range'] = p90 - p10
features[f'{axis}_iqr'] = np.percentile(data, 75) - np.percentile(data, 25)
# Compute covariance and correlation between bandwidth_raw and flops_raw
if len(group) > 1: # Need at least 2 points for correlation
cov = np.cov(group['bandwidth_raw'], group['flops_raw'])[0, 1]
features['bw_flops_covariance'] = cov
corr, _ = stats.pearsonr(group['bandwidth_raw'], group['flops_raw'])
features['bw_flops_correlation'] = corr
# Additional useful features for the classifier
# Performance metrics
features['avg_performance_gflops'] = group['performance_gflops'].mean()
features['median_performance_gflops'] = group['performance_gflops'].median()
features['performance_gflops_mad'] = compute_mad(group['performance_gflops'].values)
# # Efficiency metrics
# features['avg_efficiency'] = group['efficiency'].mean()
# features['median_efficiency'] = group['efficiency'].median()
# features['efficiency_mad'] = compute_mad(group['efficiency'].values)
# features['efficiency_p10'] = np.percentile(group['efficiency'].values, 10)
# features['efficiency_p90'] = np.percentile(group['efficiency'].values, 90)
# # Distribution of roofline regions (memory-bound vs compute-bound)
# if 'roofline_region' in group.columns:
# region_counts = group['roofline_region'].value_counts(normalize=True).to_dict()
# for region, ratio in region_counts.items():
# features[f'region_{region}_ratio'] = ratio
# System characteristics
if 'memory_bw_gbs' in group.columns:
features['avg_memory_bw_gbs'] = group['memory_bw_gbs'].mean()
if 'scalar_peak_gflops' in group.columns and len(group['scalar_peak_gflops'].unique()) > 0:
features['scalar_peak_gflops'] = group['scalar_peak_gflops'].iloc[0]
if 'simd_peak_gflops' in group.columns and len(group['simd_peak_gflops'].unique()) > 0:
features['simd_peak_gflops'] = group['simd_peak_gflops'].iloc[0]
# # Subcluster information if available
# if 'subcluster_name' in group.columns and not group['subcluster_name'].isna().all():
# features['subcluster_name'] = group['subcluster_name'].iloc[0]
# Duration information
if 'duration' in group.columns:
features['duration'] = group['duration'].iloc[0]
# Get the label (application type) from the database
cursor.execute("SELECT tags FROM job_tags WHERE job_id = ?", (int(job_id_full),))
result = cursor.fetchone()
if result:
# Extract application name from tags
tags = result[0]
features['label'] = tags
else:
features['label'] = 'Unknown'
all_job_features.append(features)
except Exception as e:
print(f"Error processing file {h5_file}: {e}")
# Close database connection
conn.close()
if not all_job_features:
print("No features extracted. Check if files exist and have the correct format.")
return
# Convert to DataFrame
features_df = pd.DataFrame(all_job_features)
# Fill missing roofline region ratios with 0
region_columns = [col for col in features_df.columns if col.startswith('region_')]
for col in region_columns:
if col not in features_df.columns:
features_df[col] = 0
else:
features_df[col] = features_df[col].fillna(0)
# Save to H5 file
features_df.to_hdf(output_file, key='features', mode='w')
print(f"Processed {len(all_job_features)} job-node combinations")
print(f"Features saved to {output_file}")
print(f"Feature columns: {', '.join(features_df.columns)}")
return features_df
if __name__ == "__main__":
process_roofline_data()