172 lines
7.7 KiB
Python
172 lines
7.7 KiB
Python
import os
|
|
import pandas as pd
|
|
import numpy as np
|
|
import sqlite3
|
|
import h5py
|
|
from glob import glob
|
|
from scipy import stats
|
|
from pathlib import Path
|
|
|
|
def compute_mad(data):
|
|
"""Compute Median Absolute Deviation: a robust measure of dispersion"""
|
|
return np.median(np.abs(data - np.median(data)))
|
|
|
|
def process_roofline_data(base_dir='D:/roofline_dataframes',
|
|
output_file='D:/roofline_features.h5',
|
|
job_tags_db='job_tags.db'):
|
|
"""
|
|
Process roofline data to extract features for machine learning.
|
|
|
|
Args:
|
|
base_dir: Directory containing roofline dataframes
|
|
output_file: Path to save the output features
|
|
job_tags_db: Path to SQLite database with job tags
|
|
"""
|
|
# Connect to the SQLite database
|
|
conn = sqlite3.connect(job_tags_db)
|
|
cursor = conn.cursor()
|
|
|
|
# List to store all job features
|
|
all_job_features = []
|
|
|
|
# Find all job prefix folders in the base directory
|
|
job_prefixes = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]
|
|
|
|
for job_id_prefix in job_prefixes:
|
|
job_prefix_path = os.path.join(base_dir, job_id_prefix)
|
|
|
|
# Find all h5 files in this job prefix folder
|
|
h5_files = glob(os.path.join(job_prefix_path, '*_dataframe.h5'))
|
|
|
|
for h5_file in h5_files:
|
|
filename = os.path.basename(h5_file)
|
|
# Extract job_id_full from the filename pattern: {job_id_full}_{timestamp}_dataframe.h5
|
|
job_id_full = filename.split('_')[0]
|
|
|
|
try:
|
|
# Read the dataframe from the h5 file
|
|
with h5py.File(h5_file, 'r') as f:
|
|
if 'dataframe' not in f:
|
|
print(f"Warning: No 'dataframe' key in {h5_file}")
|
|
continue
|
|
|
|
df = pd.read_hdf(h5_file, key='dataframe')
|
|
|
|
# Group data by node_num
|
|
grouped = df.groupby('node_num')
|
|
|
|
for node_num, group in grouped:
|
|
features = {
|
|
'job_id': job_id_full,
|
|
'node_num': node_num
|
|
}
|
|
|
|
# Compute statistics for key metrics
|
|
for axis in ['bandwidth_raw', 'flops_raw', 'arith_intensity']:
|
|
data = group[axis].values
|
|
|
|
# Compute percentiles
|
|
p10 = np.percentile(data, 10)
|
|
p50 = np.median(data)
|
|
p90 = np.percentile(data, 90)
|
|
|
|
# Compute MAD (more robust than variance)
|
|
mad = compute_mad(data)
|
|
|
|
# Store features
|
|
features[f'{axis}_p10'] = p10
|
|
features[f'{axis}_median'] = p50
|
|
features[f'{axis}_p90'] = p90
|
|
features[f'{axis}_mad'] = mad
|
|
features[f'{axis}_range'] = p90 - p10
|
|
features[f'{axis}_iqr'] = np.percentile(data, 75) - np.percentile(data, 25)
|
|
|
|
# Compute covariance and correlation between bandwidth_raw and flops_raw
|
|
if len(group) > 1: # Need at least 2 points for correlation
|
|
cov = np.cov(group['bandwidth_raw'], group['flops_raw'])[0, 1]
|
|
features['bw_flops_covariance'] = cov
|
|
|
|
corr, _ = stats.pearsonr(group['bandwidth_raw'], group['flops_raw'])
|
|
features['bw_flops_correlation'] = corr
|
|
|
|
# Additional useful features for the classifier
|
|
|
|
# Performance metrics
|
|
features['avg_performance_gflops'] = group['performance_gflops'].mean()
|
|
features['median_performance_gflops'] = group['performance_gflops'].median()
|
|
features['performance_gflops_mad'] = compute_mad(group['performance_gflops'].values)
|
|
|
|
# # Efficiency metrics
|
|
# features['avg_efficiency'] = group['efficiency'].mean()
|
|
# features['median_efficiency'] = group['efficiency'].median()
|
|
# features['efficiency_mad'] = compute_mad(group['efficiency'].values)
|
|
# features['efficiency_p10'] = np.percentile(group['efficiency'].values, 10)
|
|
# features['efficiency_p90'] = np.percentile(group['efficiency'].values, 90)
|
|
|
|
# # Distribution of roofline regions (memory-bound vs compute-bound)
|
|
# if 'roofline_region' in group.columns:
|
|
# region_counts = group['roofline_region'].value_counts(normalize=True).to_dict()
|
|
# for region, ratio in region_counts.items():
|
|
# features[f'region_{region}_ratio'] = ratio
|
|
|
|
# System characteristics
|
|
if 'memory_bw_gbs' in group.columns:
|
|
features['avg_memory_bw_gbs'] = group['memory_bw_gbs'].mean()
|
|
if 'scalar_peak_gflops' in group.columns and len(group['scalar_peak_gflops'].unique()) > 0:
|
|
features['scalar_peak_gflops'] = group['scalar_peak_gflops'].iloc[0]
|
|
if 'simd_peak_gflops' in group.columns and len(group['simd_peak_gflops'].unique()) > 0:
|
|
features['simd_peak_gflops'] = group['simd_peak_gflops'].iloc[0]
|
|
|
|
# # Subcluster information if available
|
|
# if 'subcluster_name' in group.columns and not group['subcluster_name'].isna().all():
|
|
# features['subcluster_name'] = group['subcluster_name'].iloc[0]
|
|
|
|
# Duration information
|
|
if 'duration' in group.columns:
|
|
features['duration'] = group['duration'].iloc[0]
|
|
|
|
# Get the label (application type) from the database
|
|
cursor.execute("SELECT tags FROM job_tags WHERE job_id = ?", (int(job_id_full),))
|
|
result = cursor.fetchone()
|
|
|
|
if result:
|
|
# Extract application name from tags
|
|
tags = result[0]
|
|
features['label'] = tags
|
|
else:
|
|
features['label'] = 'Unknown'
|
|
|
|
all_job_features.append(features)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing file {h5_file}: {e}")
|
|
|
|
# Close database connection
|
|
conn.close()
|
|
|
|
if not all_job_features:
|
|
print("No features extracted. Check if files exist and have the correct format.")
|
|
return
|
|
|
|
# Convert to DataFrame
|
|
features_df = pd.DataFrame(all_job_features)
|
|
|
|
# Fill missing roofline region ratios with 0
|
|
region_columns = [col for col in features_df.columns if col.startswith('region_')]
|
|
for col in region_columns:
|
|
if col not in features_df.columns:
|
|
features_df[col] = 0
|
|
else:
|
|
features_df[col] = features_df[col].fillna(0)
|
|
|
|
# Save to H5 file
|
|
features_df.to_hdf(output_file, key='features', mode='w')
|
|
|
|
print(f"Processed {len(all_job_features)} job-node combinations")
|
|
print(f"Features saved to {output_file}")
|
|
print(f"Feature columns: {', '.join(features_df.columns)}")
|
|
|
|
return features_df
|
|
|
|
if __name__ == "__main__":
|
|
process_roofline_data() |