添加 roofline_dataframe_generator.py
This commit is contained in:
646
roofline_dataframe_generator.py
Normal file
646
roofline_dataframe_generator.py
Normal file
@@ -0,0 +1,646 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import gzip
|
||||
import json
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from datetime import datetime
|
||||
from multiprocessing import Pool, cpu_count
|
||||
import functools
|
||||
|
||||
def create_ml_example():
|
||||
"""
|
||||
Example function showing how to use the generated dataframes for ML training
|
||||
"""
|
||||
print("=== Machine Learning Example with Roofline Data ===")
|
||||
print("This is an example of how to use the generated dataframes for training ML models.")
|
||||
print("The dataframes contain features suitable for various ML tasks:")
|
||||
print("- Regression: Predict performance (performance_gflops)")
|
||||
print("- Classification: Predict roofline region (roofline_region)")
|
||||
print("- Clustering: Group similar performance patterns")
|
||||
print()
|
||||
|
||||
# Example feature columns available in the dataframes
|
||||
feature_columns = [
|
||||
'arith_intensity', 'bandwidth_raw', 'flops_raw',
|
||||
'intensity_to_scalar_knee_ratio', 'intensity_to_simd_knee_ratio',
|
||||
'performance_to_scalar_peak_ratio', 'performance_to_simd_peak_ratio',
|
||||
'bandwidth_to_memory_bw_ratio', 'efficiency',
|
||||
'is_memory_bound', 'is_scalar_compute_bound',
|
||||
'is_simd_compute_bound', 'is_compute_bound'
|
||||
]
|
||||
|
||||
target_columns = [
|
||||
'performance_gflops', # For regression
|
||||
'roofline_region', # For classification
|
||||
'efficiency' # For regression
|
||||
]
|
||||
|
||||
print("Available features:")
|
||||
for i, feature in enumerate(feature_columns, 1):
|
||||
print(f" {i:2d}. {feature}")
|
||||
|
||||
print("\nAvailable targets:")
|
||||
for i, target in enumerate(target_columns, 1):
|
||||
print(f" {i:2d}. {target}")
|
||||
|
||||
print("\nExample scikit-learn code:")
|
||||
print("""
|
||||
import pandas as pd
|
||||
from sklearn.model_selection import train_test_split
|
||||
from sklearn.ensemble import RandomForestRegressor
|
||||
from sklearn.metrics import mean_squared_error, r2_score
|
||||
|
||||
# Load dataframe
|
||||
df = pd.read_csv('path/to/dataframe.csv')
|
||||
|
||||
# Prepare features and target
|
||||
features = ['arith_intensity', 'bandwidth_raw', 'flops_raw', 'efficiency']
|
||||
X = df[features]
|
||||
y = df['performance_gflops']
|
||||
|
||||
# Split data
|
||||
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
|
||||
|
||||
# Train model
|
||||
model = RandomForestRegressor(n_estimators=100, random_state=42)
|
||||
model.fit(X_train, y_train)
|
||||
|
||||
# Evaluate
|
||||
y_pred = model.predict(X_test)
|
||||
print(f"R² Score: {r2_score(y_test, y_pred):.4f}")
|
||||
print(f"RMSE: {mean_squared_error(y_test, y_pred, squared=False):.4f}")
|
||||
""")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Check if this is being run as an example
|
||||
if len(os.sys.argv) > 1 and os.sys.argv[1] == '--ml-example':
|
||||
create_ml_example()
|
||||
exit(0)
|
||||
|
||||
# ... existing code ...
|
||||
|
||||
def load_job_data_from_folder(job_path):
|
||||
"""Load job data from folder"""
|
||||
job_path = Path(job_path)
|
||||
|
||||
# Load data.json.gz
|
||||
data_file_path = job_path / "data.json.gz"
|
||||
if not data_file_path.exists():
|
||||
raise FileNotFoundError(f"data.json.gz not found in {job_path}")
|
||||
|
||||
with gzip.open(data_file_path, 'rt') as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Load meta.json
|
||||
meta_file_path = job_path / "meta.json"
|
||||
if not meta_file_path.exists():
|
||||
raise FileNotFoundError(f"meta.json not found in {job_path}")
|
||||
|
||||
with open(meta_file_path, 'r') as f:
|
||||
meta = json.load(f)
|
||||
|
||||
return data, meta
|
||||
|
||||
def load_cluster_config_from_folder(base_path, cluster_path="cluster.json"):
|
||||
"""Load cluster configuration data from folder"""
|
||||
config_path = Path(base_path) / cluster_path
|
||||
if not config_path.exists():
|
||||
raise FileNotFoundError(f"cluster.json not found at {config_path}")
|
||||
|
||||
with open(config_path, 'r') as f:
|
||||
return json.load(f)
|
||||
|
||||
def find_job_directories(base_path):
|
||||
"""Find all job directories in the folder structure"""
|
||||
base_path = Path(base_path)
|
||||
job_dirs = []
|
||||
|
||||
# Look for directories matching pattern: {prefix}/{suffix}/{timestamp}
|
||||
for prefix_dir in base_path.iterdir():
|
||||
if not prefix_dir.is_dir():
|
||||
continue
|
||||
for suffix_dir in prefix_dir.iterdir():
|
||||
if not suffix_dir.is_dir():
|
||||
continue
|
||||
for timestamp_dir in suffix_dir.iterdir():
|
||||
if not timestamp_dir.is_dir():
|
||||
continue
|
||||
# Check if this directory contains both meta.json and data.json.gz
|
||||
if (timestamp_dir / "meta.json").exists() and (timestamp_dir / "data.json.gz").exists():
|
||||
job_dirs.append(timestamp_dir)
|
||||
|
||||
return job_dirs
|
||||
|
||||
def load_done_list(done_file_path):
|
||||
"""Load list of already processed job prefixes from done.txt"""
|
||||
done_file = Path(done_file_path)
|
||||
if not done_file.exists():
|
||||
return set()
|
||||
|
||||
with open(done_file, 'r') as f:
|
||||
return set(line.strip() for line in f if line.strip())
|
||||
|
||||
def save_done_list(done_file_path, done_set):
|
||||
"""Save list of processed job prefixes to done.txt"""
|
||||
with open(done_file_path, 'w') as f:
|
||||
for prefix in sorted(done_set):
|
||||
f.write(f"{prefix}\n")
|
||||
|
||||
def navigate_nested_dict(d, path):
|
||||
"""Navigate a nested dictionary using a dot-separated path with array indices"""
|
||||
if not path:
|
||||
return d
|
||||
|
||||
current = d
|
||||
parts = path.split('.')
|
||||
|
||||
for part in parts:
|
||||
if '[' in part and ']' in part:
|
||||
# Handle explicit array access syntax (e.g., "node[0]")
|
||||
array_name = part.split('[')[0]
|
||||
index = int(part.split('[')[1].split(']')[0])
|
||||
current = current[array_name][index]
|
||||
elif part.isdigit():
|
||||
# Handle numeric indices for list access
|
||||
current = current[int(part)]
|
||||
else:
|
||||
# Handle regular dict access
|
||||
current = current[part]
|
||||
|
||||
return current
|
||||
|
||||
def get_subcluster_config(cluster_config, subcluster_name):
|
||||
"""Get the configuration for a specific subcluster by name"""
|
||||
subclusters = cluster_config.get("subClusters", [])
|
||||
|
||||
for i, subcluster in enumerate(subclusters):
|
||||
if subcluster.get("name") == subcluster_name:
|
||||
return i
|
||||
|
||||
return 0 # Default to first subcluster if not found
|
||||
|
||||
def get_performance_value(cluster_config, subcluster_idx, metric_path):
|
||||
"""Extract a performance value with its proper unit prefix"""
|
||||
# Build the full path
|
||||
full_path = f"subClusters.{subcluster_idx}.{metric_path}"
|
||||
|
||||
# Extract values
|
||||
value_parts = full_path.split('.')
|
||||
|
||||
# Direct dictionary traversal instead of using navigate_nested_dict
|
||||
current = cluster_config
|
||||
for part in value_parts:
|
||||
if part.isdigit():
|
||||
current = current[int(part)]
|
||||
else:
|
||||
current = current[part]
|
||||
|
||||
value = current.get("value", 0)
|
||||
|
||||
# Get prefix if available
|
||||
prefix = ""
|
||||
if "unit" in current and "prefix" in current["unit"]:
|
||||
prefix = current["unit"]["prefix"]
|
||||
|
||||
# Apply prefix multiplier
|
||||
prefix_multipliers = {
|
||||
'K': 1e3,
|
||||
'M': 1e6,
|
||||
'G': 1e9,
|
||||
'T': 1e12,
|
||||
'P': 1e15,
|
||||
'': 1.0
|
||||
}
|
||||
|
||||
multiplier = prefix_multipliers.get(prefix, 1.0)
|
||||
return value * multiplier
|
||||
|
||||
def create_roofline_dataframe(data, meta, cluster_config, output_path=None, save_format='both'):
|
||||
"""Create roofline dataframe with data points for ML training
|
||||
|
||||
Args:
|
||||
data: Job performance data
|
||||
meta: Job metadata
|
||||
cluster_config: Cluster configuration
|
||||
output_path: Path to save the dataframe (without extension)
|
||||
save_format: Format to save ('csv', 'pickle', 'parquet', 'hdf5', 'both', 'compressed')
|
||||
"""
|
||||
# Get number of nodes and subcluster name
|
||||
num_nodes = meta['numNodes']
|
||||
subcluster_name = meta.get('subCluster', 'main') # Default to 'main' if not specified
|
||||
|
||||
# Get subcluster index from the cluster configuration
|
||||
subcluster_idx = get_subcluster_config(cluster_config, subcluster_name)
|
||||
|
||||
# Get performance ceilings from cluster configuration
|
||||
scalar_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateScalar")
|
||||
simd_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateSimd")
|
||||
mem_bw = get_performance_value(cluster_config, subcluster_idx, "memoryBandwidth")
|
||||
|
||||
print(f"=== Roofline DataFrame Configuration ===")
|
||||
print(f"Subcluster: {subcluster_name}")
|
||||
print(f"SIMD Peak Performance: {simd_flops/1e9:.2f} GFLOP/s")
|
||||
print(f"Scalar Peak Performance: {scalar_flops/1e9:.2f} GFLOP/s")
|
||||
print(f"Memory Bandwidth: {mem_bw/1e9:.2f} GB/s")
|
||||
print(f"Number of Nodes: {num_nodes}")
|
||||
|
||||
# Convert to GFLOP/s for consistency
|
||||
scalar_peak_gflops = scalar_flops / 1e9
|
||||
simd_peak_gflops = simd_flops / 1e9
|
||||
mem_bw_bytes = mem_bw # Already in bytes/s from get_performance_value
|
||||
|
||||
# Calculate knee points for rooflines
|
||||
scalar_knee_intensity = scalar_flops / mem_bw_bytes # FLOP/byte
|
||||
simd_knee_intensity = simd_flops / mem_bw_bytes # FLOP/byte
|
||||
|
||||
print(f"Scalar knee intensity: {scalar_knee_intensity:.4f} FLOP/byte")
|
||||
print(f"SIMD knee intensity: {simd_knee_intensity:.4f} FLOP/byte")
|
||||
|
||||
print(f"\n=== Data Points ===")
|
||||
print(f"{'Node':4} {'Timestep':8} {'Bandwidth (raw)':15} {'FLOPS (raw)':15} {'AI (FLOP/byte)':15}")
|
||||
print("-" * 75)
|
||||
|
||||
# Initialize lists to collect data points
|
||||
data_points = []
|
||||
|
||||
# Statistics for data filtering
|
||||
total_data_points = 0
|
||||
filtered_invalid = 0
|
||||
filtered_above_roof = 0
|
||||
filtered_out_of_bounds = 0
|
||||
|
||||
# Fixed bounds for consistency with original plotting
|
||||
x_min, x_max = 0.01, 1000.0 # FLOP/byte
|
||||
y_min, y_max = 1.0, 10000.0 # GFLOP/s
|
||||
|
||||
for node_num in range(num_nodes):
|
||||
try:
|
||||
# Extract data series
|
||||
mem_bw_series = data.get("mem_bw", {}).get("node", {}).get("series", [])
|
||||
flops_series = data.get("flops_any", {}).get("node", {}).get("series", [])
|
||||
|
||||
if node_num >= len(mem_bw_series) or node_num >= len(flops_series):
|
||||
continue
|
||||
|
||||
mem_bw_data = mem_bw_series[node_num].get("data", [])
|
||||
flops_data = flops_series[node_num].get("data", [])
|
||||
|
||||
# Ensure we have the same number of time steps for both metrics
|
||||
num_timesteps = min(len(mem_bw_data), len(flops_data))
|
||||
if num_timesteps == 0:
|
||||
continue
|
||||
|
||||
total_data_points += num_timesteps
|
||||
|
||||
# Process each time step
|
||||
for t_idx in range(num_timesteps):
|
||||
# Get data values (assuming raw data is already in appropriate units)
|
||||
bw_raw = mem_bw_data[t_idx] # Memory bandwidth (raw value)
|
||||
flops_raw = flops_data[t_idx] # Performance in FLOP/s (raw value)
|
||||
|
||||
# Skip if we have invalid data (None, NaN, or non-positive values)
|
||||
if (bw_raw is None or flops_raw is None or
|
||||
(isinstance(bw_raw, (int, float)) and (np.isnan(bw_raw) or np.isinf(bw_raw))) or
|
||||
(isinstance(flops_raw, (int, float)) and (np.isnan(flops_raw) or np.isinf(flops_raw))) or
|
||||
bw_raw <= 0 or flops_raw <= 0):
|
||||
filtered_invalid += 1
|
||||
continue
|
||||
|
||||
# For arithmetic intensity calculation, we need consistent units
|
||||
flops_gflops = flops_raw # Assume raw is already in GFLOP/s
|
||||
bw_gb_per_sec = bw_raw # Assume raw is in GB/s
|
||||
|
||||
# Calculate AI in FLOP/byte: (GFLOP/s) / (GB/s) = FLOP/byte
|
||||
arith_intensity = flops_gflops / bw_gb_per_sec
|
||||
|
||||
# Additional validation: check for reasonable ranges
|
||||
# Skip if arithmetic intensity is too extreme (likely measurement error)
|
||||
if arith_intensity < 1e-6 or arith_intensity > 1e6:
|
||||
filtered_invalid += 1
|
||||
continue
|
||||
|
||||
# Use raw FLOPS value for analysis (assume it's already in GFLOP/s)
|
||||
performance_gflops = flops_raw
|
||||
|
||||
# Check if performance exceeds the theoretical maximum (above roofline)
|
||||
# Use the higher of SIMD or scalar peak performance as the ceiling
|
||||
max_theoretical_performance = max(simd_peak_gflops, scalar_peak_gflops)
|
||||
|
||||
# Also check against memory bandwidth ceiling at this intensity
|
||||
mem_bw_ceiling = arith_intensity * (mem_bw_bytes / 1e9) # Convert to GFLOP/s
|
||||
effective_ceiling = min(max_theoretical_performance, mem_bw_ceiling)
|
||||
|
||||
# Skip unrealistic data points that are significantly above the roofline
|
||||
# Allow for some measurement tolerance (e.g., 5% above theoretical max)
|
||||
if performance_gflops > effective_ceiling * 1.05:
|
||||
filtered_above_roof += 1
|
||||
continue
|
||||
|
||||
# Print key data points for monitoring
|
||||
if t_idx % 10 == 0: # Print every 10th point to avoid spam
|
||||
print(f"{node_num:4d} {t_idx:8d} {bw_raw:15.2f} {performance_gflops:15.2f} {arith_intensity:15.4f}")
|
||||
|
||||
# Check if points are within our fixed bounds
|
||||
if (x_min <= arith_intensity <= x_max and
|
||||
y_min <= performance_gflops <= y_max):
|
||||
|
||||
# Determine which roofline region this point falls into
|
||||
if arith_intensity <= min(scalar_knee_intensity, simd_knee_intensity):
|
||||
# Memory-bound region
|
||||
roofline_region = "memory_bound"
|
||||
theoretical_max = arith_intensity * (mem_bw_bytes / 1e9)
|
||||
elif arith_intensity <= max(scalar_knee_intensity, simd_knee_intensity):
|
||||
# Mixed region - determine which compute roof is limiting
|
||||
if scalar_knee_intensity <= arith_intensity <= simd_knee_intensity:
|
||||
roofline_region = "scalar_compute_bound"
|
||||
theoretical_max = scalar_peak_gflops
|
||||
else:
|
||||
roofline_region = "simd_compute_bound"
|
||||
theoretical_max = simd_peak_gflops
|
||||
else:
|
||||
# Compute-bound region
|
||||
roofline_region = "compute_bound"
|
||||
theoretical_max = max(simd_peak_gflops, scalar_peak_gflops)
|
||||
|
||||
# Calculate efficiency relative to theoretical maximum
|
||||
efficiency = performance_gflops / theoretical_max if theoretical_max > 0 else 0
|
||||
|
||||
# Create data point dictionary
|
||||
data_point = {
|
||||
'node_num': node_num,
|
||||
'timestep': t_idx,
|
||||
'bandwidth_raw': bw_raw,
|
||||
'flops_raw': flops_raw,
|
||||
'performance_gflops': performance_gflops,
|
||||
'arith_intensity': arith_intensity,
|
||||
'roofline_region': roofline_region,
|
||||
'theoretical_max_gflops': theoretical_max,
|
||||
'efficiency': efficiency,
|
||||
'scalar_peak_gflops': scalar_peak_gflops,
|
||||
'simd_peak_gflops': simd_peak_gflops,
|
||||
'memory_bw_gbs': mem_bw_bytes / 1e9,
|
||||
'scalar_knee_intensity': scalar_knee_intensity,
|
||||
'simd_knee_intensity': simd_knee_intensity,
|
||||
'subcluster_name': subcluster_name,
|
||||
'job_id': meta.get('jobId', 'unknown'),
|
||||
'duration': meta.get('duration', 0)
|
||||
}
|
||||
|
||||
data_points.append(data_point)
|
||||
else:
|
||||
filtered_out_of_bounds += 1
|
||||
|
||||
except (KeyError, IndexError, ValueError) as e:
|
||||
print(f"Warning: Error processing data for node {node_num}: {e}")
|
||||
continue
|
||||
|
||||
# Print filtering statistics
|
||||
valid_points = len(data_points)
|
||||
print(f"\n=== Data Filtering Statistics ===")
|
||||
print(f"Total data points processed: {total_data_points}")
|
||||
print(f"Filtered out - Invalid/None/NaN: {filtered_invalid}")
|
||||
print(f"Filtered out - Above roofline: {filtered_above_roof}")
|
||||
print(f"Filtered out - Out of bounds: {filtered_out_of_bounds}")
|
||||
print(f"Valid data points collected: {valid_points}")
|
||||
print(f"Data retention rate: {(valid_points/total_data_points)*100:.1f}%" if total_data_points > 0 else "No data points")
|
||||
|
||||
if not data_points:
|
||||
print("\nNo valid data points found within the bounds - skipping dataframe creation")
|
||||
return False, None # Return False to indicate no dataframe was created
|
||||
|
||||
# Create DataFrame from collected data points
|
||||
df = pd.DataFrame(data_points)
|
||||
|
||||
# Add derived features useful for ML
|
||||
df['intensity_to_scalar_knee_ratio'] = df['arith_intensity'] / df['scalar_knee_intensity']
|
||||
df['intensity_to_simd_knee_ratio'] = df['arith_intensity'] / df['simd_knee_intensity']
|
||||
df['performance_to_scalar_peak_ratio'] = df['performance_gflops'] / df['scalar_peak_gflops']
|
||||
df['performance_to_simd_peak_ratio'] = df['performance_gflops'] / df['simd_peak_gflops']
|
||||
df['bandwidth_to_memory_bw_ratio'] = df['bandwidth_raw'] / df['memory_bw_gbs']
|
||||
|
||||
# Add categorical encodings for roofline regions
|
||||
df['is_memory_bound'] = (df['roofline_region'] == 'memory_bound').astype(int)
|
||||
df['is_scalar_compute_bound'] = (df['roofline_region'] == 'scalar_compute_bound').astype(int)
|
||||
df['is_simd_compute_bound'] = (df['roofline_region'] == 'simd_compute_bound').astype(int)
|
||||
df['is_compute_bound'] = (df['roofline_region'] == 'compute_bound').astype(int)
|
||||
|
||||
print(f"\nCreated DataFrame with {len(df)} rows and {len(df.columns)} columns")
|
||||
print(f"Columns: {list(df.columns)}")
|
||||
|
||||
# Save DataFrame if output path is provided
|
||||
if output_path:
|
||||
# Create output directory if it doesn't exist
|
||||
output_dir = Path(output_path).parent
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
saved_files = []
|
||||
base_path = Path(output_path)
|
||||
|
||||
if save_format in ['csv', 'both', 'compressed']:
|
||||
# Save as CSV
|
||||
if save_format == 'compressed':
|
||||
csv_path = base_path.with_suffix('.csv.gz')
|
||||
df.to_csv(csv_path, index=False, compression='gzip')
|
||||
print(f" Compressed CSV: {csv_path}")
|
||||
else:
|
||||
csv_path = base_path.with_suffix('.csv')
|
||||
df.to_csv(csv_path, index=False)
|
||||
print(f" CSV: {csv_path}")
|
||||
saved_files.append(str(csv_path))
|
||||
|
||||
if save_format in ['pickle', 'both', 'compressed']:
|
||||
# Save as pickle
|
||||
if save_format == 'compressed':
|
||||
pickle_path = base_path.with_suffix('.pkl.gz')
|
||||
df.to_pickle(pickle_path, compression='gzip')
|
||||
print(f" Compressed Pickle: {pickle_path}")
|
||||
else:
|
||||
pickle_path = base_path.with_suffix('.pkl')
|
||||
df.to_pickle(pickle_path)
|
||||
print(f" Pickle: {pickle_path}")
|
||||
saved_files.append(str(pickle_path))
|
||||
|
||||
if save_format == 'parquet':
|
||||
# Save as Parquet (compressed by default)
|
||||
try:
|
||||
parquet_path = base_path.with_suffix('.parquet')
|
||||
df.to_parquet(parquet_path, index=False)
|
||||
print(f" Parquet: {parquet_path}")
|
||||
saved_files.append(str(parquet_path))
|
||||
except ImportError:
|
||||
print(" Warning: Parquet format requires 'pyarrow' or 'fastparquet'. Install with: pip install pyarrow")
|
||||
|
||||
if save_format == 'hdf5':
|
||||
# Save as HDF5
|
||||
try:
|
||||
hdf5_path = base_path.with_suffix('.h5')
|
||||
df.to_hdf(hdf5_path, key='dataframe', mode='w', index=False)
|
||||
print(f" HDF5: {hdf5_path}")
|
||||
saved_files.append(str(hdf5_path))
|
||||
except ImportError:
|
||||
print(" Warning: HDF5 format requires 'tables'. Install with: pip install tables")
|
||||
|
||||
if saved_files:
|
||||
print(f"\nSuccessfully saved dataframe in {save_format} format(s)")
|
||||
print(f"Files saved: {', '.join(saved_files)}")
|
||||
else:
|
||||
print(f"\nWarning: No files saved. Format '{save_format}' may not be supported or required packages missing.")
|
||||
|
||||
return True, df # Return True and the dataframe
|
||||
else:
|
||||
print("\nDataFrame created but not saved (no output path provided)")
|
||||
return True, df # Return True and the dataframe
|
||||
|
||||
def process_single_job(args):
|
||||
"""Process a single job - designed for multiprocessing"""
|
||||
job_path, cluster_config, output_base_dir, save_format = args
|
||||
|
||||
try:
|
||||
# Parse job directory structure: {prefix}/{suffix}/{timestamp}
|
||||
parts = job_path.parts
|
||||
if len(parts) < 3:
|
||||
return False, f"Invalid job path structure: {job_path}"
|
||||
|
||||
job_id_prefix = parts[-3]
|
||||
job_id_suffix = parts[-2]
|
||||
timestamp = parts[-1]
|
||||
job_id = job_id_prefix + job_id_suffix
|
||||
|
||||
# Load job data
|
||||
data, meta = load_job_data_from_folder(job_path)
|
||||
|
||||
# Check job duration - skip jobs less than 60 seconds
|
||||
duration = meta.get('duration', 0)
|
||||
if duration < 60:
|
||||
return False, f"Skipped job {job_id}_{timestamp}: duration {duration}s < 60s"
|
||||
|
||||
# Create output directory for this job prefix
|
||||
output_dir = Path(output_base_dir) / job_id_prefix
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Generate dataframe
|
||||
base_filename = f"{job_id}_{timestamp}"
|
||||
dataframe_path = output_dir / f"{base_filename}_dataframe"
|
||||
|
||||
success, df = create_roofline_dataframe(data, meta, cluster_config,
|
||||
output_path=str(dataframe_path), save_format=save_format)
|
||||
|
||||
# Check if dataframe was created successfully
|
||||
if success and df is not None:
|
||||
return True, f"Processed job {job_id}_{timestamp} (duration: {duration}s, {len(df)} data points)"
|
||||
else:
|
||||
return False, f"Skipped job {job_id}_{timestamp}: no valid data to create dataframe"
|
||||
|
||||
except Exception as e:
|
||||
return False, f"Error processing job {job_path}: {e}"
|
||||
|
||||
def process_all_jobs(base_path, output_base_dir, done_file_path="done.txt", num_processes=None, save_format='both'):
|
||||
"""Process all jobs in the folder structure using multiprocessing"""
|
||||
print(f"Processing jobs from: {base_path}")
|
||||
print(f"Output directory: {output_base_dir}")
|
||||
print(f"Save format: {save_format}")
|
||||
|
||||
# Load cluster configuration
|
||||
try:
|
||||
cluster_config = load_cluster_config_from_folder(base_path)
|
||||
print("Loaded cluster configuration")
|
||||
except FileNotFoundError as e:
|
||||
print(f"Error loading cluster config: {e}")
|
||||
return
|
||||
|
||||
# Load already processed jobs
|
||||
done_set = load_done_list(done_file_path)
|
||||
print(f"Loaded {len(done_set)} already processed job prefixes from {done_file_path}")
|
||||
|
||||
# Find all job directories
|
||||
job_dirs = find_job_directories(base_path)
|
||||
print(f"Found {len(job_dirs)} total job directories")
|
||||
|
||||
# Filter out already processed jobs
|
||||
filtered_job_dirs = []
|
||||
for job_path in job_dirs:
|
||||
parts = job_path.parts
|
||||
if len(parts) >= 3:
|
||||
job_id_prefix = parts[-3]
|
||||
if job_id_prefix not in done_set:
|
||||
filtered_job_dirs.append(job_path)
|
||||
|
||||
print(f"Found {len(filtered_job_dirs)} unprocessed job directories")
|
||||
|
||||
if not filtered_job_dirs:
|
||||
print("No new jobs to process")
|
||||
return
|
||||
|
||||
# Determine number of processes
|
||||
if num_processes is None:
|
||||
num_processes = min(12, cpu_count()) # Use up to 12 cores or available cores
|
||||
|
||||
print(f"Using {num_processes} processes for parallel processing")
|
||||
|
||||
# Prepare arguments for multiprocessing
|
||||
process_args = [(job_path, cluster_config, output_base_dir, save_format) for job_path in filtered_job_dirs]
|
||||
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
processed_prefixes = set()
|
||||
|
||||
# Process jobs in parallel
|
||||
if num_processes == 1:
|
||||
# Single-threaded processing for debugging
|
||||
results = [process_single_job(args) for args in process_args]
|
||||
else:
|
||||
# Multi-threaded processing
|
||||
with Pool(num_processes) as pool:
|
||||
results = pool.map(process_single_job, process_args)
|
||||
|
||||
# Collect results
|
||||
for i, (success, message) in enumerate(results):
|
||||
print(message)
|
||||
if success:
|
||||
processed_count += 1
|
||||
# Extract job prefix for done tracking
|
||||
job_path = filtered_job_dirs[i]
|
||||
parts = job_path.parts
|
||||
if len(parts) >= 3:
|
||||
job_id_prefix = parts[-3]
|
||||
processed_prefixes.add(job_id_prefix)
|
||||
else:
|
||||
skipped_count += 1
|
||||
|
||||
# Update done list with newly processed prefixes
|
||||
if processed_prefixes:
|
||||
updated_done_set = done_set | processed_prefixes
|
||||
save_done_list(done_file_path, updated_done_set)
|
||||
print(f"Updated {done_file_path} with {len(processed_prefixes)} new processed prefixes")
|
||||
|
||||
print(f"Completed processing {processed_count} jobs, skipped {skipped_count} jobs (duration < 60s or no valid data)")
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Generate roofline dataframes from folder structure for ML training')
|
||||
parser.add_argument('--base-path', default=r'D:\fritz',
|
||||
help='Path to job archive folder')
|
||||
parser.add_argument('--output-dir', default='roofline_dataframes',
|
||||
help='Output directory for dataframes')
|
||||
parser.add_argument('--done-file', default='done.txt',
|
||||
help='File to track processed job prefixes')
|
||||
parser.add_argument('--processes', type=int, default=12,
|
||||
help='Number of parallel processes to use (default: 12)')
|
||||
parser.add_argument('--save-format', default='both',
|
||||
choices=['csv', 'pickle', 'both', 'compressed', 'parquet', 'hdf5'],
|
||||
help='Format to save dataframes (default: both)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
process_all_jobs(args.base_path, args.output_dir, args.done_file, args.processes, args.save_format)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing jobs: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user