Files
slurm-application-detection/feature_aggregator.py
2025-10-14 15:02:50 +02:00

172 lines
7.7 KiB
Python

import os
import pandas as pd
import numpy as np
import sqlite3
import h5py
from glob import glob
from scipy import stats
from pathlib import Path
def compute_mad(data):
"""Compute Median Absolute Deviation: a robust measure of dispersion"""
return np.median(np.abs(data - np.median(data)))
def process_roofline_data(base_dir='D:/roofline_dataframes',
output_file='D:/roofline_features.h5',
job_tags_db='job_tags.db'):
"""
Process roofline data to extract features for machine learning.
Args:
base_dir: Directory containing roofline dataframes
output_file: Path to save the output features
job_tags_db: Path to SQLite database with job tags
"""
# Connect to the SQLite database
conn = sqlite3.connect(job_tags_db)
cursor = conn.cursor()
# List to store all job features
all_job_features = []
# Find all job prefix folders in the base directory
job_prefixes = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]
for job_id_prefix in job_prefixes:
job_prefix_path = os.path.join(base_dir, job_id_prefix)
# Find all h5 files in this job prefix folder
h5_files = glob(os.path.join(job_prefix_path, '*_dataframe.h5'))
for h5_file in h5_files:
filename = os.path.basename(h5_file)
# Extract job_id_full from the filename pattern: {job_id_full}_{timestamp}_dataframe.h5
job_id_full = filename.split('_')[0]
try:
# Read the dataframe from the h5 file
with h5py.File(h5_file, 'r') as f:
if 'dataframe' not in f:
print(f"Warning: No 'dataframe' key in {h5_file}")
continue
df = pd.read_hdf(h5_file, key='dataframe')
# Group data by node_num
grouped = df.groupby('node_num')
for node_num, group in grouped:
features = {
'job_id': job_id_full,
'node_num': node_num
}
# Compute statistics for key metrics
for axis in ['bandwidth_raw', 'flops_raw', 'arith_intensity']:
data = group[axis].values
# Compute percentiles
p10 = np.percentile(data, 10)
p50 = np.median(data)
p90 = np.percentile(data, 90)
# Compute MAD (more robust than variance)
mad = compute_mad(data)
# Store features
features[f'{axis}_p10'] = p10
features[f'{axis}_median'] = p50
features[f'{axis}_p90'] = p90
features[f'{axis}_mad'] = mad
features[f'{axis}_range'] = p90 - p10
features[f'{axis}_iqr'] = np.percentile(data, 75) - np.percentile(data, 25)
# Compute covariance and correlation between bandwidth_raw and flops_raw
if len(group) > 1: # Need at least 2 points for correlation
cov = np.cov(group['bandwidth_raw'], group['flops_raw'])[0, 1]
features['bw_flops_covariance'] = cov
corr, _ = stats.pearsonr(group['bandwidth_raw'], group['flops_raw'])
features['bw_flops_correlation'] = corr
# Additional useful features for the classifier
# Performance metrics
features['avg_performance_gflops'] = group['performance_gflops'].mean()
features['median_performance_gflops'] = group['performance_gflops'].median()
features['performance_gflops_mad'] = compute_mad(group['performance_gflops'].values)
# # Efficiency metrics
# features['avg_efficiency'] = group['efficiency'].mean()
# features['median_efficiency'] = group['efficiency'].median()
# features['efficiency_mad'] = compute_mad(group['efficiency'].values)
# features['efficiency_p10'] = np.percentile(group['efficiency'].values, 10)
# features['efficiency_p90'] = np.percentile(group['efficiency'].values, 90)
# # Distribution of roofline regions (memory-bound vs compute-bound)
# if 'roofline_region' in group.columns:
# region_counts = group['roofline_region'].value_counts(normalize=True).to_dict()
# for region, ratio in region_counts.items():
# features[f'region_{region}_ratio'] = ratio
# System characteristics
if 'memory_bw_gbs' in group.columns:
features['avg_memory_bw_gbs'] = group['memory_bw_gbs'].mean()
if 'scalar_peak_gflops' in group.columns and len(group['scalar_peak_gflops'].unique()) > 0:
features['scalar_peak_gflops'] = group['scalar_peak_gflops'].iloc[0]
if 'simd_peak_gflops' in group.columns and len(group['simd_peak_gflops'].unique()) > 0:
features['simd_peak_gflops'] = group['simd_peak_gflops'].iloc[0]
# # Subcluster information if available
# if 'subcluster_name' in group.columns and not group['subcluster_name'].isna().all():
# features['subcluster_name'] = group['subcluster_name'].iloc[0]
# Duration information
if 'duration' in group.columns:
features['duration'] = group['duration'].iloc[0]
# Get the label (application type) from the database
cursor.execute("SELECT tags FROM job_tags WHERE job_id = ?", (int(job_id_full),))
result = cursor.fetchone()
if result:
# Extract application name from tags
tags = result[0]
features['label'] = tags
else:
features['label'] = 'Unknown'
all_job_features.append(features)
except Exception as e:
print(f"Error processing file {h5_file}: {e}")
# Close database connection
conn.close()
if not all_job_features:
print("No features extracted. Check if files exist and have the correct format.")
return
# Convert to DataFrame
features_df = pd.DataFrame(all_job_features)
# Fill missing roofline region ratios with 0
region_columns = [col for col in features_df.columns if col.startswith('region_')]
for col in region_columns:
if col not in features_df.columns:
features_df[col] = 0
else:
features_df[col] = features_df[col].fillna(0)
# Save to H5 file
features_df.to_hdf(output_file, key='features', mode='w')
print(f"Processed {len(all_job_features)} job-node combinations")
print(f"Features saved to {output_file}")
print(f"Feature columns: {', '.join(features_df.columns)}")
return features_df
if __name__ == "__main__":
process_roofline_data()