Files
slurm-application-detection/roofline_dataframe_generator.py

646 lines
26 KiB
Python

#!/usr/bin/env python3
import os
import gzip
import json
import argparse
from pathlib import Path
import pandas as pd
import numpy as np
from datetime import datetime
from multiprocessing import Pool, cpu_count
import functools
def create_ml_example():
"""
Example function showing how to use the generated dataframes for ML training
"""
print("=== Machine Learning Example with Roofline Data ===")
print("This is an example of how to use the generated dataframes for training ML models.")
print("The dataframes contain features suitable for various ML tasks:")
print("- Regression: Predict performance (performance_gflops)")
print("- Classification: Predict roofline region (roofline_region)")
print("- Clustering: Group similar performance patterns")
print()
# Example feature columns available in the dataframes
feature_columns = [
'arith_intensity', 'bandwidth_raw', 'flops_raw',
'intensity_to_scalar_knee_ratio', 'intensity_to_simd_knee_ratio',
'performance_to_scalar_peak_ratio', 'performance_to_simd_peak_ratio',
'bandwidth_to_memory_bw_ratio', 'efficiency',
'is_memory_bound', 'is_scalar_compute_bound',
'is_simd_compute_bound', 'is_compute_bound'
]
target_columns = [
'performance_gflops', # For regression
'roofline_region', # For classification
'efficiency' # For regression
]
print("Available features:")
for i, feature in enumerate(feature_columns, 1):
print(f" {i:2d}. {feature}")
print("\nAvailable targets:")
for i, target in enumerate(target_columns, 1):
print(f" {i:2d}. {target}")
print("\nExample scikit-learn code:")
print("""
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
# Load dataframe
df = pd.read_csv('path/to/dataframe.csv')
# Prepare features and target
features = ['arith_intensity', 'bandwidth_raw', 'flops_raw', 'efficiency']
X = df[features]
y = df['performance_gflops']
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train model
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
print(f"R² Score: {r2_score(y_test, y_pred):.4f}")
print(f"RMSE: {mean_squared_error(y_test, y_pred, squared=False):.4f}")
""")
if __name__ == "__main__":
# Check if this is being run as an example
if len(os.sys.argv) > 1 and os.sys.argv[1] == '--ml-example':
create_ml_example()
exit(0)
# ... existing code ...
def load_job_data_from_folder(job_path):
"""Load job data from folder"""
job_path = Path(job_path)
# Load data.json.gz
data_file_path = job_path / "data.json.gz"
if not data_file_path.exists():
raise FileNotFoundError(f"data.json.gz not found in {job_path}")
with gzip.open(data_file_path, 'rt') as f:
data = json.load(f)
# Load meta.json
meta_file_path = job_path / "meta.json"
if not meta_file_path.exists():
raise FileNotFoundError(f"meta.json not found in {job_path}")
with open(meta_file_path, 'r') as f:
meta = json.load(f)
return data, meta
def load_cluster_config_from_folder(base_path, cluster_path="cluster.json"):
"""Load cluster configuration data from folder"""
config_path = Path(base_path) / cluster_path
if not config_path.exists():
raise FileNotFoundError(f"cluster.json not found at {config_path}")
with open(config_path, 'r') as f:
return json.load(f)
def find_job_directories(base_path):
"""Find all job directories in the folder structure"""
base_path = Path(base_path)
job_dirs = []
# Look for directories matching pattern: {prefix}/{suffix}/{timestamp}
for prefix_dir in base_path.iterdir():
if not prefix_dir.is_dir():
continue
for suffix_dir in prefix_dir.iterdir():
if not suffix_dir.is_dir():
continue
for timestamp_dir in suffix_dir.iterdir():
if not timestamp_dir.is_dir():
continue
# Check if this directory contains both meta.json and data.json.gz
if (timestamp_dir / "meta.json").exists() and (timestamp_dir / "data.json.gz").exists():
job_dirs.append(timestamp_dir)
return job_dirs
def load_done_list(done_file_path):
"""Load list of already processed job prefixes from done.txt"""
done_file = Path(done_file_path)
if not done_file.exists():
return set()
with open(done_file, 'r') as f:
return set(line.strip() for line in f if line.strip())
def save_done_list(done_file_path, done_set):
"""Save list of processed job prefixes to done.txt"""
with open(done_file_path, 'w') as f:
for prefix in sorted(done_set):
f.write(f"{prefix}\n")
def navigate_nested_dict(d, path):
"""Navigate a nested dictionary using a dot-separated path with array indices"""
if not path:
return d
current = d
parts = path.split('.')
for part in parts:
if '[' in part and ']' in part:
# Handle explicit array access syntax (e.g., "node[0]")
array_name = part.split('[')[0]
index = int(part.split('[')[1].split(']')[0])
current = current[array_name][index]
elif part.isdigit():
# Handle numeric indices for list access
current = current[int(part)]
else:
# Handle regular dict access
current = current[part]
return current
def get_subcluster_config(cluster_config, subcluster_name):
"""Get the configuration for a specific subcluster by name"""
subclusters = cluster_config.get("subClusters", [])
for i, subcluster in enumerate(subclusters):
if subcluster.get("name") == subcluster_name:
return i
return 0 # Default to first subcluster if not found
def get_performance_value(cluster_config, subcluster_idx, metric_path):
"""Extract a performance value with its proper unit prefix"""
# Build the full path
full_path = f"subClusters.{subcluster_idx}.{metric_path}"
# Extract values
value_parts = full_path.split('.')
# Direct dictionary traversal instead of using navigate_nested_dict
current = cluster_config
for part in value_parts:
if part.isdigit():
current = current[int(part)]
else:
current = current[part]
value = current.get("value", 0)
# Get prefix if available
prefix = ""
if "unit" in current and "prefix" in current["unit"]:
prefix = current["unit"]["prefix"]
# Apply prefix multiplier
prefix_multipliers = {
'K': 1e3,
'M': 1e6,
'G': 1e9,
'T': 1e12,
'P': 1e15,
'': 1.0
}
multiplier = prefix_multipliers.get(prefix, 1.0)
return value * multiplier
def create_roofline_dataframe(data, meta, cluster_config, output_path=None, save_format='both'):
"""Create roofline dataframe with data points for ML training
Args:
data: Job performance data
meta: Job metadata
cluster_config: Cluster configuration
output_path: Path to save the dataframe (without extension)
save_format: Format to save ('csv', 'pickle', 'parquet', 'hdf5', 'both', 'compressed')
"""
# Get number of nodes and subcluster name
num_nodes = meta['numNodes']
subcluster_name = meta.get('subCluster', 'main') # Default to 'main' if not specified
# Get subcluster index from the cluster configuration
subcluster_idx = get_subcluster_config(cluster_config, subcluster_name)
# Get performance ceilings from cluster configuration
scalar_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateScalar")
simd_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateSimd")
mem_bw = get_performance_value(cluster_config, subcluster_idx, "memoryBandwidth")
print(f"=== Roofline DataFrame Configuration ===")
print(f"Subcluster: {subcluster_name}")
print(f"SIMD Peak Performance: {simd_flops/1e9:.2f} GFLOP/s")
print(f"Scalar Peak Performance: {scalar_flops/1e9:.2f} GFLOP/s")
print(f"Memory Bandwidth: {mem_bw/1e9:.2f} GB/s")
print(f"Number of Nodes: {num_nodes}")
# Convert to GFLOP/s for consistency
scalar_peak_gflops = scalar_flops / 1e9
simd_peak_gflops = simd_flops / 1e9
mem_bw_bytes = mem_bw # Already in bytes/s from get_performance_value
# Calculate knee points for rooflines
scalar_knee_intensity = scalar_flops / mem_bw_bytes # FLOP/byte
simd_knee_intensity = simd_flops / mem_bw_bytes # FLOP/byte
print(f"Scalar knee intensity: {scalar_knee_intensity:.4f} FLOP/byte")
print(f"SIMD knee intensity: {simd_knee_intensity:.4f} FLOP/byte")
print(f"\n=== Data Points ===")
print(f"{'Node':4} {'Timestep':8} {'Bandwidth (raw)':15} {'FLOPS (raw)':15} {'AI (FLOP/byte)':15}")
print("-" * 75)
# Initialize lists to collect data points
data_points = []
# Statistics for data filtering
total_data_points = 0
filtered_invalid = 0
filtered_above_roof = 0
filtered_out_of_bounds = 0
# Fixed bounds for consistency with original plotting
x_min, x_max = 0.01, 1000.0 # FLOP/byte
y_min, y_max = 1.0, 10000.0 # GFLOP/s
for node_num in range(num_nodes):
try:
# Extract data series
mem_bw_series = data.get("mem_bw", {}).get("node", {}).get("series", [])
flops_series = data.get("flops_any", {}).get("node", {}).get("series", [])
if node_num >= len(mem_bw_series) or node_num >= len(flops_series):
continue
mem_bw_data = mem_bw_series[node_num].get("data", [])
flops_data = flops_series[node_num].get("data", [])
# Ensure we have the same number of time steps for both metrics
num_timesteps = min(len(mem_bw_data), len(flops_data))
if num_timesteps == 0:
continue
total_data_points += num_timesteps
# Process each time step
for t_idx in range(num_timesteps):
# Get data values (assuming raw data is already in appropriate units)
bw_raw = mem_bw_data[t_idx] # Memory bandwidth (raw value)
flops_raw = flops_data[t_idx] # Performance in FLOP/s (raw value)
# Skip if we have invalid data (None, NaN, or non-positive values)
if (bw_raw is None or flops_raw is None or
(isinstance(bw_raw, (int, float)) and (np.isnan(bw_raw) or np.isinf(bw_raw))) or
(isinstance(flops_raw, (int, float)) and (np.isnan(flops_raw) or np.isinf(flops_raw))) or
bw_raw <= 0 or flops_raw <= 0):
filtered_invalid += 1
continue
# For arithmetic intensity calculation, we need consistent units
flops_gflops = flops_raw # Assume raw is already in GFLOP/s
bw_gb_per_sec = bw_raw # Assume raw is in GB/s
# Calculate AI in FLOP/byte: (GFLOP/s) / (GB/s) = FLOP/byte
arith_intensity = flops_gflops / bw_gb_per_sec
# Additional validation: check for reasonable ranges
# Skip if arithmetic intensity is too extreme (likely measurement error)
if arith_intensity < 1e-6 or arith_intensity > 1e6:
filtered_invalid += 1
continue
# Use raw FLOPS value for analysis (assume it's already in GFLOP/s)
performance_gflops = flops_raw
# Check if performance exceeds the theoretical maximum (above roofline)
# Use the higher of SIMD or scalar peak performance as the ceiling
max_theoretical_performance = max(simd_peak_gflops, scalar_peak_gflops)
# Also check against memory bandwidth ceiling at this intensity
mem_bw_ceiling = arith_intensity * (mem_bw_bytes / 1e9) # Convert to GFLOP/s
effective_ceiling = min(max_theoretical_performance, mem_bw_ceiling)
# Skip unrealistic data points that are significantly above the roofline
# Allow for some measurement tolerance (e.g., 5% above theoretical max)
if performance_gflops > effective_ceiling * 1.05:
filtered_above_roof += 1
continue
# Print key data points for monitoring
if t_idx % 10 == 0: # Print every 10th point to avoid spam
print(f"{node_num:4d} {t_idx:8d} {bw_raw:15.2f} {performance_gflops:15.2f} {arith_intensity:15.4f}")
# Check if points are within our fixed bounds
if (x_min <= arith_intensity <= x_max and
y_min <= performance_gflops <= y_max):
# Determine which roofline region this point falls into
if arith_intensity <= min(scalar_knee_intensity, simd_knee_intensity):
# Memory-bound region
roofline_region = "memory_bound"
theoretical_max = arith_intensity * (mem_bw_bytes / 1e9)
elif arith_intensity <= max(scalar_knee_intensity, simd_knee_intensity):
# Mixed region - determine which compute roof is limiting
if scalar_knee_intensity <= arith_intensity <= simd_knee_intensity:
roofline_region = "scalar_compute_bound"
theoretical_max = scalar_peak_gflops
else:
roofline_region = "simd_compute_bound"
theoretical_max = simd_peak_gflops
else:
# Compute-bound region
roofline_region = "compute_bound"
theoretical_max = max(simd_peak_gflops, scalar_peak_gflops)
# Calculate efficiency relative to theoretical maximum
efficiency = performance_gflops / theoretical_max if theoretical_max > 0 else 0
# Create data point dictionary
data_point = {
'node_num': node_num,
'timestep': t_idx,
'bandwidth_raw': bw_raw,
'flops_raw': flops_raw,
'performance_gflops': performance_gflops,
'arith_intensity': arith_intensity,
'roofline_region': roofline_region,
'theoretical_max_gflops': theoretical_max,
'efficiency': efficiency,
'scalar_peak_gflops': scalar_peak_gflops,
'simd_peak_gflops': simd_peak_gflops,
'memory_bw_gbs': mem_bw_bytes / 1e9,
'scalar_knee_intensity': scalar_knee_intensity,
'simd_knee_intensity': simd_knee_intensity,
'subcluster_name': subcluster_name,
'job_id': meta.get('jobId', 'unknown'),
'duration': meta.get('duration', 0)
}
data_points.append(data_point)
else:
filtered_out_of_bounds += 1
except (KeyError, IndexError, ValueError) as e:
print(f"Warning: Error processing data for node {node_num}: {e}")
continue
# Print filtering statistics
valid_points = len(data_points)
print(f"\n=== Data Filtering Statistics ===")
print(f"Total data points processed: {total_data_points}")
print(f"Filtered out - Invalid/None/NaN: {filtered_invalid}")
print(f"Filtered out - Above roofline: {filtered_above_roof}")
print(f"Filtered out - Out of bounds: {filtered_out_of_bounds}")
print(f"Valid data points collected: {valid_points}")
print(f"Data retention rate: {(valid_points/total_data_points)*100:.1f}%" if total_data_points > 0 else "No data points")
if not data_points:
print("\nNo valid data points found within the bounds - skipping dataframe creation")
return False, None # Return False to indicate no dataframe was created
# Create DataFrame from collected data points
df = pd.DataFrame(data_points)
# Add derived features useful for ML
df['intensity_to_scalar_knee_ratio'] = df['arith_intensity'] / df['scalar_knee_intensity']
df['intensity_to_simd_knee_ratio'] = df['arith_intensity'] / df['simd_knee_intensity']
df['performance_to_scalar_peak_ratio'] = df['performance_gflops'] / df['scalar_peak_gflops']
df['performance_to_simd_peak_ratio'] = df['performance_gflops'] / df['simd_peak_gflops']
df['bandwidth_to_memory_bw_ratio'] = df['bandwidth_raw'] / df['memory_bw_gbs']
# Add categorical encodings for roofline regions
df['is_memory_bound'] = (df['roofline_region'] == 'memory_bound').astype(int)
df['is_scalar_compute_bound'] = (df['roofline_region'] == 'scalar_compute_bound').astype(int)
df['is_simd_compute_bound'] = (df['roofline_region'] == 'simd_compute_bound').astype(int)
df['is_compute_bound'] = (df['roofline_region'] == 'compute_bound').astype(int)
print(f"\nCreated DataFrame with {len(df)} rows and {len(df.columns)} columns")
print(f"Columns: {list(df.columns)}")
# Save DataFrame if output path is provided
if output_path:
# Create output directory if it doesn't exist
output_dir = Path(output_path).parent
output_dir.mkdir(parents=True, exist_ok=True)
saved_files = []
base_path = Path(output_path)
if save_format in ['csv', 'both', 'compressed']:
# Save as CSV
if save_format == 'compressed':
csv_path = base_path.with_suffix('.csv.gz')
df.to_csv(csv_path, index=False, compression='gzip')
print(f" Compressed CSV: {csv_path}")
else:
csv_path = base_path.with_suffix('.csv')
df.to_csv(csv_path, index=False)
print(f" CSV: {csv_path}")
saved_files.append(str(csv_path))
if save_format in ['pickle', 'both', 'compressed']:
# Save as pickle
if save_format == 'compressed':
pickle_path = base_path.with_suffix('.pkl.gz')
df.to_pickle(pickle_path, compression='gzip')
print(f" Compressed Pickle: {pickle_path}")
else:
pickle_path = base_path.with_suffix('.pkl')
df.to_pickle(pickle_path)
print(f" Pickle: {pickle_path}")
saved_files.append(str(pickle_path))
if save_format == 'parquet':
# Save as Parquet (compressed by default)
try:
parquet_path = base_path.with_suffix('.parquet')
df.to_parquet(parquet_path, index=False)
print(f" Parquet: {parquet_path}")
saved_files.append(str(parquet_path))
except ImportError:
print(" Warning: Parquet format requires 'pyarrow' or 'fastparquet'. Install with: pip install pyarrow")
if save_format == 'hdf5':
# Save as HDF5
try:
hdf5_path = base_path.with_suffix('.h5')
df.to_hdf(hdf5_path, key='dataframe', mode='w', index=False)
print(f" HDF5: {hdf5_path}")
saved_files.append(str(hdf5_path))
except ImportError:
print(" Warning: HDF5 format requires 'tables'. Install with: pip install tables")
if saved_files:
print(f"\nSuccessfully saved dataframe in {save_format} format(s)")
print(f"Files saved: {', '.join(saved_files)}")
else:
print(f"\nWarning: No files saved. Format '{save_format}' may not be supported or required packages missing.")
return True, df # Return True and the dataframe
else:
print("\nDataFrame created but not saved (no output path provided)")
return True, df # Return True and the dataframe
def process_single_job(args):
"""Process a single job - designed for multiprocessing"""
job_path, cluster_config, output_base_dir, save_format = args
try:
# Parse job directory structure: {prefix}/{suffix}/{timestamp}
parts = job_path.parts
if len(parts) < 3:
return False, f"Invalid job path structure: {job_path}"
job_id_prefix = parts[-3]
job_id_suffix = parts[-2]
timestamp = parts[-1]
job_id = job_id_prefix + job_id_suffix
# Load job data
data, meta = load_job_data_from_folder(job_path)
# Check job duration - skip jobs less than 60 seconds
duration = meta.get('duration', 0)
if duration < 60:
return False, f"Skipped job {job_id}_{timestamp}: duration {duration}s < 60s"
# Create output directory for this job prefix
output_dir = Path(output_base_dir) / job_id_prefix
output_dir.mkdir(parents=True, exist_ok=True)
# Generate dataframe
base_filename = f"{job_id}_{timestamp}"
dataframe_path = output_dir / f"{base_filename}_dataframe"
success, df = create_roofline_dataframe(data, meta, cluster_config,
output_path=str(dataframe_path), save_format=save_format)
# Check if dataframe was created successfully
if success and df is not None:
return True, f"Processed job {job_id}_{timestamp} (duration: {duration}s, {len(df)} data points)"
else:
return False, f"Skipped job {job_id}_{timestamp}: no valid data to create dataframe"
except Exception as e:
return False, f"Error processing job {job_path}: {e}"
def process_all_jobs(base_path, output_base_dir, done_file_path="done.txt", num_processes=None, save_format='both'):
"""Process all jobs in the folder structure using multiprocessing"""
print(f"Processing jobs from: {base_path}")
print(f"Output directory: {output_base_dir}")
print(f"Save format: {save_format}")
# Load cluster configuration
try:
cluster_config = load_cluster_config_from_folder(base_path)
print("Loaded cluster configuration")
except FileNotFoundError as e:
print(f"Error loading cluster config: {e}")
return
# Load already processed jobs
done_set = load_done_list(done_file_path)
print(f"Loaded {len(done_set)} already processed job prefixes from {done_file_path}")
# Find all job directories
job_dirs = find_job_directories(base_path)
print(f"Found {len(job_dirs)} total job directories")
# Filter out already processed jobs
filtered_job_dirs = []
for job_path in job_dirs:
parts = job_path.parts
if len(parts) >= 3:
job_id_prefix = parts[-3]
if job_id_prefix not in done_set:
filtered_job_dirs.append(job_path)
print(f"Found {len(filtered_job_dirs)} unprocessed job directories")
if not filtered_job_dirs:
print("No new jobs to process")
return
# Determine number of processes
if num_processes is None:
num_processes = min(12, cpu_count()) # Use up to 12 cores or available cores
print(f"Using {num_processes} processes for parallel processing")
# Prepare arguments for multiprocessing
process_args = [(job_path, cluster_config, output_base_dir, save_format) for job_path in filtered_job_dirs]
processed_count = 0
skipped_count = 0
processed_prefixes = set()
# Process jobs in parallel
if num_processes == 1:
# Single-threaded processing for debugging
results = [process_single_job(args) for args in process_args]
else:
# Multi-threaded processing
with Pool(num_processes) as pool:
results = pool.map(process_single_job, process_args)
# Collect results
for i, (success, message) in enumerate(results):
print(message)
if success:
processed_count += 1
# Extract job prefix for done tracking
job_path = filtered_job_dirs[i]
parts = job_path.parts
if len(parts) >= 3:
job_id_prefix = parts[-3]
processed_prefixes.add(job_id_prefix)
else:
skipped_count += 1
# Update done list with newly processed prefixes
if processed_prefixes:
updated_done_set = done_set | processed_prefixes
save_done_list(done_file_path, updated_done_set)
print(f"Updated {done_file_path} with {len(processed_prefixes)} new processed prefixes")
print(f"Completed processing {processed_count} jobs, skipped {skipped_count} jobs (duration < 60s or no valid data)")
def main():
parser = argparse.ArgumentParser(description='Generate roofline dataframes from folder structure for ML training')
parser.add_argument('--base-path', default=r'D:\fritz',
help='Path to job archive folder')
parser.add_argument('--output-dir', default='roofline_dataframes',
help='Output directory for dataframes')
parser.add_argument('--done-file', default='done.txt',
help='File to track processed job prefixes')
parser.add_argument('--processes', type=int, default=12,
help='Number of parallel processes to use (default: 12)')
parser.add_argument('--save-format', default='both',
choices=['csv', 'pickle', 'both', 'compressed', 'parquet', 'hdf5'],
help='Format to save dataframes (default: both)')
args = parser.parse_args()
try:
process_all_jobs(args.base_path, args.output_dir, args.done_file, args.processes, args.save_format)
except Exception as e:
print(f"Error processing jobs: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()