646 lines
26 KiB
Python
646 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import gzip
|
|
import json
|
|
import argparse
|
|
from pathlib import Path
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime
|
|
from multiprocessing import Pool, cpu_count
|
|
import functools
|
|
|
|
def create_ml_example():
|
|
"""
|
|
Example function showing how to use the generated dataframes for ML training
|
|
"""
|
|
print("=== Machine Learning Example with Roofline Data ===")
|
|
print("This is an example of how to use the generated dataframes for training ML models.")
|
|
print("The dataframes contain features suitable for various ML tasks:")
|
|
print("- Regression: Predict performance (performance_gflops)")
|
|
print("- Classification: Predict roofline region (roofline_region)")
|
|
print("- Clustering: Group similar performance patterns")
|
|
print()
|
|
|
|
# Example feature columns available in the dataframes
|
|
feature_columns = [
|
|
'arith_intensity', 'bandwidth_raw', 'flops_raw',
|
|
'intensity_to_scalar_knee_ratio', 'intensity_to_simd_knee_ratio',
|
|
'performance_to_scalar_peak_ratio', 'performance_to_simd_peak_ratio',
|
|
'bandwidth_to_memory_bw_ratio', 'efficiency',
|
|
'is_memory_bound', 'is_scalar_compute_bound',
|
|
'is_simd_compute_bound', 'is_compute_bound'
|
|
]
|
|
|
|
target_columns = [
|
|
'performance_gflops', # For regression
|
|
'roofline_region', # For classification
|
|
'efficiency' # For regression
|
|
]
|
|
|
|
print("Available features:")
|
|
for i, feature in enumerate(feature_columns, 1):
|
|
print(f" {i:2d}. {feature}")
|
|
|
|
print("\nAvailable targets:")
|
|
for i, target in enumerate(target_columns, 1):
|
|
print(f" {i:2d}. {target}")
|
|
|
|
print("\nExample scikit-learn code:")
|
|
print("""
|
|
import pandas as pd
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.ensemble import RandomForestRegressor
|
|
from sklearn.metrics import mean_squared_error, r2_score
|
|
|
|
# Load dataframe
|
|
df = pd.read_csv('path/to/dataframe.csv')
|
|
|
|
# Prepare features and target
|
|
features = ['arith_intensity', 'bandwidth_raw', 'flops_raw', 'efficiency']
|
|
X = df[features]
|
|
y = df['performance_gflops']
|
|
|
|
# Split data
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
|
|
|
|
# Train model
|
|
model = RandomForestRegressor(n_estimators=100, random_state=42)
|
|
model.fit(X_train, y_train)
|
|
|
|
# Evaluate
|
|
y_pred = model.predict(X_test)
|
|
print(f"R² Score: {r2_score(y_test, y_pred):.4f}")
|
|
print(f"RMSE: {mean_squared_error(y_test, y_pred, squared=False):.4f}")
|
|
""")
|
|
|
|
if __name__ == "__main__":
|
|
# Check if this is being run as an example
|
|
if len(os.sys.argv) > 1 and os.sys.argv[1] == '--ml-example':
|
|
create_ml_example()
|
|
exit(0)
|
|
|
|
# ... existing code ...
|
|
|
|
def load_job_data_from_folder(job_path):
|
|
"""Load job data from folder"""
|
|
job_path = Path(job_path)
|
|
|
|
# Load data.json.gz
|
|
data_file_path = job_path / "data.json.gz"
|
|
if not data_file_path.exists():
|
|
raise FileNotFoundError(f"data.json.gz not found in {job_path}")
|
|
|
|
with gzip.open(data_file_path, 'rt') as f:
|
|
data = json.load(f)
|
|
|
|
# Load meta.json
|
|
meta_file_path = job_path / "meta.json"
|
|
if not meta_file_path.exists():
|
|
raise FileNotFoundError(f"meta.json not found in {job_path}")
|
|
|
|
with open(meta_file_path, 'r') as f:
|
|
meta = json.load(f)
|
|
|
|
return data, meta
|
|
|
|
def load_cluster_config_from_folder(base_path, cluster_path="cluster.json"):
|
|
"""Load cluster configuration data from folder"""
|
|
config_path = Path(base_path) / cluster_path
|
|
if not config_path.exists():
|
|
raise FileNotFoundError(f"cluster.json not found at {config_path}")
|
|
|
|
with open(config_path, 'r') as f:
|
|
return json.load(f)
|
|
|
|
def find_job_directories(base_path):
|
|
"""Find all job directories in the folder structure"""
|
|
base_path = Path(base_path)
|
|
job_dirs = []
|
|
|
|
# Look for directories matching pattern: {prefix}/{suffix}/{timestamp}
|
|
for prefix_dir in base_path.iterdir():
|
|
if not prefix_dir.is_dir():
|
|
continue
|
|
for suffix_dir in prefix_dir.iterdir():
|
|
if not suffix_dir.is_dir():
|
|
continue
|
|
for timestamp_dir in suffix_dir.iterdir():
|
|
if not timestamp_dir.is_dir():
|
|
continue
|
|
# Check if this directory contains both meta.json and data.json.gz
|
|
if (timestamp_dir / "meta.json").exists() and (timestamp_dir / "data.json.gz").exists():
|
|
job_dirs.append(timestamp_dir)
|
|
|
|
return job_dirs
|
|
|
|
def load_done_list(done_file_path):
|
|
"""Load list of already processed job prefixes from done.txt"""
|
|
done_file = Path(done_file_path)
|
|
if not done_file.exists():
|
|
return set()
|
|
|
|
with open(done_file, 'r') as f:
|
|
return set(line.strip() for line in f if line.strip())
|
|
|
|
def save_done_list(done_file_path, done_set):
|
|
"""Save list of processed job prefixes to done.txt"""
|
|
with open(done_file_path, 'w') as f:
|
|
for prefix in sorted(done_set):
|
|
f.write(f"{prefix}\n")
|
|
|
|
def navigate_nested_dict(d, path):
|
|
"""Navigate a nested dictionary using a dot-separated path with array indices"""
|
|
if not path:
|
|
return d
|
|
|
|
current = d
|
|
parts = path.split('.')
|
|
|
|
for part in parts:
|
|
if '[' in part and ']' in part:
|
|
# Handle explicit array access syntax (e.g., "node[0]")
|
|
array_name = part.split('[')[0]
|
|
index = int(part.split('[')[1].split(']')[0])
|
|
current = current[array_name][index]
|
|
elif part.isdigit():
|
|
# Handle numeric indices for list access
|
|
current = current[int(part)]
|
|
else:
|
|
# Handle regular dict access
|
|
current = current[part]
|
|
|
|
return current
|
|
|
|
def get_subcluster_config(cluster_config, subcluster_name):
|
|
"""Get the configuration for a specific subcluster by name"""
|
|
subclusters = cluster_config.get("subClusters", [])
|
|
|
|
for i, subcluster in enumerate(subclusters):
|
|
if subcluster.get("name") == subcluster_name:
|
|
return i
|
|
|
|
return 0 # Default to first subcluster if not found
|
|
|
|
def get_performance_value(cluster_config, subcluster_idx, metric_path):
|
|
"""Extract a performance value with its proper unit prefix"""
|
|
# Build the full path
|
|
full_path = f"subClusters.{subcluster_idx}.{metric_path}"
|
|
|
|
# Extract values
|
|
value_parts = full_path.split('.')
|
|
|
|
# Direct dictionary traversal instead of using navigate_nested_dict
|
|
current = cluster_config
|
|
for part in value_parts:
|
|
if part.isdigit():
|
|
current = current[int(part)]
|
|
else:
|
|
current = current[part]
|
|
|
|
value = current.get("value", 0)
|
|
|
|
# Get prefix if available
|
|
prefix = ""
|
|
if "unit" in current and "prefix" in current["unit"]:
|
|
prefix = current["unit"]["prefix"]
|
|
|
|
# Apply prefix multiplier
|
|
prefix_multipliers = {
|
|
'K': 1e3,
|
|
'M': 1e6,
|
|
'G': 1e9,
|
|
'T': 1e12,
|
|
'P': 1e15,
|
|
'': 1.0
|
|
}
|
|
|
|
multiplier = prefix_multipliers.get(prefix, 1.0)
|
|
return value * multiplier
|
|
|
|
def create_roofline_dataframe(data, meta, cluster_config, output_path=None, save_format='both'):
|
|
"""Create roofline dataframe with data points for ML training
|
|
|
|
Args:
|
|
data: Job performance data
|
|
meta: Job metadata
|
|
cluster_config: Cluster configuration
|
|
output_path: Path to save the dataframe (without extension)
|
|
save_format: Format to save ('csv', 'pickle', 'parquet', 'hdf5', 'both', 'compressed')
|
|
"""
|
|
# Get number of nodes and subcluster name
|
|
num_nodes = meta['numNodes']
|
|
subcluster_name = meta.get('subCluster', 'main') # Default to 'main' if not specified
|
|
|
|
# Get subcluster index from the cluster configuration
|
|
subcluster_idx = get_subcluster_config(cluster_config, subcluster_name)
|
|
|
|
# Get performance ceilings from cluster configuration
|
|
scalar_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateScalar")
|
|
simd_flops = get_performance_value(cluster_config, subcluster_idx, "flopRateSimd")
|
|
mem_bw = get_performance_value(cluster_config, subcluster_idx, "memoryBandwidth")
|
|
|
|
print(f"=== Roofline DataFrame Configuration ===")
|
|
print(f"Subcluster: {subcluster_name}")
|
|
print(f"SIMD Peak Performance: {simd_flops/1e9:.2f} GFLOP/s")
|
|
print(f"Scalar Peak Performance: {scalar_flops/1e9:.2f} GFLOP/s")
|
|
print(f"Memory Bandwidth: {mem_bw/1e9:.2f} GB/s")
|
|
print(f"Number of Nodes: {num_nodes}")
|
|
|
|
# Convert to GFLOP/s for consistency
|
|
scalar_peak_gflops = scalar_flops / 1e9
|
|
simd_peak_gflops = simd_flops / 1e9
|
|
mem_bw_bytes = mem_bw # Already in bytes/s from get_performance_value
|
|
|
|
# Calculate knee points for rooflines
|
|
scalar_knee_intensity = scalar_flops / mem_bw_bytes # FLOP/byte
|
|
simd_knee_intensity = simd_flops / mem_bw_bytes # FLOP/byte
|
|
|
|
print(f"Scalar knee intensity: {scalar_knee_intensity:.4f} FLOP/byte")
|
|
print(f"SIMD knee intensity: {simd_knee_intensity:.4f} FLOP/byte")
|
|
|
|
print(f"\n=== Data Points ===")
|
|
print(f"{'Node':4} {'Timestep':8} {'Bandwidth (raw)':15} {'FLOPS (raw)':15} {'AI (FLOP/byte)':15}")
|
|
print("-" * 75)
|
|
|
|
# Initialize lists to collect data points
|
|
data_points = []
|
|
|
|
# Statistics for data filtering
|
|
total_data_points = 0
|
|
filtered_invalid = 0
|
|
filtered_above_roof = 0
|
|
filtered_out_of_bounds = 0
|
|
|
|
# Fixed bounds for consistency with original plotting
|
|
x_min, x_max = 0.01, 1000.0 # FLOP/byte
|
|
y_min, y_max = 1.0, 10000.0 # GFLOP/s
|
|
|
|
for node_num in range(num_nodes):
|
|
try:
|
|
# Extract data series
|
|
mem_bw_series = data.get("mem_bw", {}).get("node", {}).get("series", [])
|
|
flops_series = data.get("flops_any", {}).get("node", {}).get("series", [])
|
|
|
|
if node_num >= len(mem_bw_series) or node_num >= len(flops_series):
|
|
continue
|
|
|
|
mem_bw_data = mem_bw_series[node_num].get("data", [])
|
|
flops_data = flops_series[node_num].get("data", [])
|
|
|
|
# Ensure we have the same number of time steps for both metrics
|
|
num_timesteps = min(len(mem_bw_data), len(flops_data))
|
|
if num_timesteps == 0:
|
|
continue
|
|
|
|
total_data_points += num_timesteps
|
|
|
|
# Process each time step
|
|
for t_idx in range(num_timesteps):
|
|
# Get data values (assuming raw data is already in appropriate units)
|
|
bw_raw = mem_bw_data[t_idx] # Memory bandwidth (raw value)
|
|
flops_raw = flops_data[t_idx] # Performance in FLOP/s (raw value)
|
|
|
|
# Skip if we have invalid data (None, NaN, or non-positive values)
|
|
if (bw_raw is None or flops_raw is None or
|
|
(isinstance(bw_raw, (int, float)) and (np.isnan(bw_raw) or np.isinf(bw_raw))) or
|
|
(isinstance(flops_raw, (int, float)) and (np.isnan(flops_raw) or np.isinf(flops_raw))) or
|
|
bw_raw <= 0 or flops_raw <= 0):
|
|
filtered_invalid += 1
|
|
continue
|
|
|
|
# For arithmetic intensity calculation, we need consistent units
|
|
flops_gflops = flops_raw # Assume raw is already in GFLOP/s
|
|
bw_gb_per_sec = bw_raw # Assume raw is in GB/s
|
|
|
|
# Calculate AI in FLOP/byte: (GFLOP/s) / (GB/s) = FLOP/byte
|
|
arith_intensity = flops_gflops / bw_gb_per_sec
|
|
|
|
# Additional validation: check for reasonable ranges
|
|
# Skip if arithmetic intensity is too extreme (likely measurement error)
|
|
if arith_intensity < 1e-6 or arith_intensity > 1e6:
|
|
filtered_invalid += 1
|
|
continue
|
|
|
|
# Use raw FLOPS value for analysis (assume it's already in GFLOP/s)
|
|
performance_gflops = flops_raw
|
|
|
|
# Check if performance exceeds the theoretical maximum (above roofline)
|
|
# Use the higher of SIMD or scalar peak performance as the ceiling
|
|
max_theoretical_performance = max(simd_peak_gflops, scalar_peak_gflops)
|
|
|
|
# Also check against memory bandwidth ceiling at this intensity
|
|
mem_bw_ceiling = arith_intensity * (mem_bw_bytes / 1e9) # Convert to GFLOP/s
|
|
effective_ceiling = min(max_theoretical_performance, mem_bw_ceiling)
|
|
|
|
# Skip unrealistic data points that are significantly above the roofline
|
|
# Allow for some measurement tolerance (e.g., 5% above theoretical max)
|
|
if performance_gflops > effective_ceiling * 1.05:
|
|
filtered_above_roof += 1
|
|
continue
|
|
|
|
# Print key data points for monitoring
|
|
if t_idx % 10 == 0: # Print every 10th point to avoid spam
|
|
print(f"{node_num:4d} {t_idx:8d} {bw_raw:15.2f} {performance_gflops:15.2f} {arith_intensity:15.4f}")
|
|
|
|
# Check if points are within our fixed bounds
|
|
if (x_min <= arith_intensity <= x_max and
|
|
y_min <= performance_gflops <= y_max):
|
|
|
|
# Determine which roofline region this point falls into
|
|
if arith_intensity <= min(scalar_knee_intensity, simd_knee_intensity):
|
|
# Memory-bound region
|
|
roofline_region = "memory_bound"
|
|
theoretical_max = arith_intensity * (mem_bw_bytes / 1e9)
|
|
elif arith_intensity <= max(scalar_knee_intensity, simd_knee_intensity):
|
|
# Mixed region - determine which compute roof is limiting
|
|
if scalar_knee_intensity <= arith_intensity <= simd_knee_intensity:
|
|
roofline_region = "scalar_compute_bound"
|
|
theoretical_max = scalar_peak_gflops
|
|
else:
|
|
roofline_region = "simd_compute_bound"
|
|
theoretical_max = simd_peak_gflops
|
|
else:
|
|
# Compute-bound region
|
|
roofline_region = "compute_bound"
|
|
theoretical_max = max(simd_peak_gflops, scalar_peak_gflops)
|
|
|
|
# Calculate efficiency relative to theoretical maximum
|
|
efficiency = performance_gflops / theoretical_max if theoretical_max > 0 else 0
|
|
|
|
# Create data point dictionary
|
|
data_point = {
|
|
'node_num': node_num,
|
|
'timestep': t_idx,
|
|
'bandwidth_raw': bw_raw,
|
|
'flops_raw': flops_raw,
|
|
'performance_gflops': performance_gflops,
|
|
'arith_intensity': arith_intensity,
|
|
'roofline_region': roofline_region,
|
|
'theoretical_max_gflops': theoretical_max,
|
|
'efficiency': efficiency,
|
|
'scalar_peak_gflops': scalar_peak_gflops,
|
|
'simd_peak_gflops': simd_peak_gflops,
|
|
'memory_bw_gbs': mem_bw_bytes / 1e9,
|
|
'scalar_knee_intensity': scalar_knee_intensity,
|
|
'simd_knee_intensity': simd_knee_intensity,
|
|
'subcluster_name': subcluster_name,
|
|
'job_id': meta.get('jobId', 'unknown'),
|
|
'duration': meta.get('duration', 0)
|
|
}
|
|
|
|
data_points.append(data_point)
|
|
else:
|
|
filtered_out_of_bounds += 1
|
|
|
|
except (KeyError, IndexError, ValueError) as e:
|
|
print(f"Warning: Error processing data for node {node_num}: {e}")
|
|
continue
|
|
|
|
# Print filtering statistics
|
|
valid_points = len(data_points)
|
|
print(f"\n=== Data Filtering Statistics ===")
|
|
print(f"Total data points processed: {total_data_points}")
|
|
print(f"Filtered out - Invalid/None/NaN: {filtered_invalid}")
|
|
print(f"Filtered out - Above roofline: {filtered_above_roof}")
|
|
print(f"Filtered out - Out of bounds: {filtered_out_of_bounds}")
|
|
print(f"Valid data points collected: {valid_points}")
|
|
print(f"Data retention rate: {(valid_points/total_data_points)*100:.1f}%" if total_data_points > 0 else "No data points")
|
|
|
|
if not data_points:
|
|
print("\nNo valid data points found within the bounds - skipping dataframe creation")
|
|
return False, None # Return False to indicate no dataframe was created
|
|
|
|
# Create DataFrame from collected data points
|
|
df = pd.DataFrame(data_points)
|
|
|
|
# Add derived features useful for ML
|
|
df['intensity_to_scalar_knee_ratio'] = df['arith_intensity'] / df['scalar_knee_intensity']
|
|
df['intensity_to_simd_knee_ratio'] = df['arith_intensity'] / df['simd_knee_intensity']
|
|
df['performance_to_scalar_peak_ratio'] = df['performance_gflops'] / df['scalar_peak_gflops']
|
|
df['performance_to_simd_peak_ratio'] = df['performance_gflops'] / df['simd_peak_gflops']
|
|
df['bandwidth_to_memory_bw_ratio'] = df['bandwidth_raw'] / df['memory_bw_gbs']
|
|
|
|
# Add categorical encodings for roofline regions
|
|
df['is_memory_bound'] = (df['roofline_region'] == 'memory_bound').astype(int)
|
|
df['is_scalar_compute_bound'] = (df['roofline_region'] == 'scalar_compute_bound').astype(int)
|
|
df['is_simd_compute_bound'] = (df['roofline_region'] == 'simd_compute_bound').astype(int)
|
|
df['is_compute_bound'] = (df['roofline_region'] == 'compute_bound').astype(int)
|
|
|
|
print(f"\nCreated DataFrame with {len(df)} rows and {len(df.columns)} columns")
|
|
print(f"Columns: {list(df.columns)}")
|
|
|
|
# Save DataFrame if output path is provided
|
|
if output_path:
|
|
# Create output directory if it doesn't exist
|
|
output_dir = Path(output_path).parent
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
saved_files = []
|
|
base_path = Path(output_path)
|
|
|
|
if save_format in ['csv', 'both', 'compressed']:
|
|
# Save as CSV
|
|
if save_format == 'compressed':
|
|
csv_path = base_path.with_suffix('.csv.gz')
|
|
df.to_csv(csv_path, index=False, compression='gzip')
|
|
print(f" Compressed CSV: {csv_path}")
|
|
else:
|
|
csv_path = base_path.with_suffix('.csv')
|
|
df.to_csv(csv_path, index=False)
|
|
print(f" CSV: {csv_path}")
|
|
saved_files.append(str(csv_path))
|
|
|
|
if save_format in ['pickle', 'both', 'compressed']:
|
|
# Save as pickle
|
|
if save_format == 'compressed':
|
|
pickle_path = base_path.with_suffix('.pkl.gz')
|
|
df.to_pickle(pickle_path, compression='gzip')
|
|
print(f" Compressed Pickle: {pickle_path}")
|
|
else:
|
|
pickle_path = base_path.with_suffix('.pkl')
|
|
df.to_pickle(pickle_path)
|
|
print(f" Pickle: {pickle_path}")
|
|
saved_files.append(str(pickle_path))
|
|
|
|
if save_format == 'parquet':
|
|
# Save as Parquet (compressed by default)
|
|
try:
|
|
parquet_path = base_path.with_suffix('.parquet')
|
|
df.to_parquet(parquet_path, index=False)
|
|
print(f" Parquet: {parquet_path}")
|
|
saved_files.append(str(parquet_path))
|
|
except ImportError:
|
|
print(" Warning: Parquet format requires 'pyarrow' or 'fastparquet'. Install with: pip install pyarrow")
|
|
|
|
if save_format == 'hdf5':
|
|
# Save as HDF5
|
|
try:
|
|
hdf5_path = base_path.with_suffix('.h5')
|
|
df.to_hdf(hdf5_path, key='dataframe', mode='w', index=False)
|
|
print(f" HDF5: {hdf5_path}")
|
|
saved_files.append(str(hdf5_path))
|
|
except ImportError:
|
|
print(" Warning: HDF5 format requires 'tables'. Install with: pip install tables")
|
|
|
|
if saved_files:
|
|
print(f"\nSuccessfully saved dataframe in {save_format} format(s)")
|
|
print(f"Files saved: {', '.join(saved_files)}")
|
|
else:
|
|
print(f"\nWarning: No files saved. Format '{save_format}' may not be supported or required packages missing.")
|
|
|
|
return True, df # Return True and the dataframe
|
|
else:
|
|
print("\nDataFrame created but not saved (no output path provided)")
|
|
return True, df # Return True and the dataframe
|
|
|
|
def process_single_job(args):
|
|
"""Process a single job - designed for multiprocessing"""
|
|
job_path, cluster_config, output_base_dir, save_format = args
|
|
|
|
try:
|
|
# Parse job directory structure: {prefix}/{suffix}/{timestamp}
|
|
parts = job_path.parts
|
|
if len(parts) < 3:
|
|
return False, f"Invalid job path structure: {job_path}"
|
|
|
|
job_id_prefix = parts[-3]
|
|
job_id_suffix = parts[-2]
|
|
timestamp = parts[-1]
|
|
job_id = job_id_prefix + job_id_suffix
|
|
|
|
# Load job data
|
|
data, meta = load_job_data_from_folder(job_path)
|
|
|
|
# Check job duration - skip jobs less than 60 seconds
|
|
duration = meta.get('duration', 0)
|
|
if duration < 60:
|
|
return False, f"Skipped job {job_id}_{timestamp}: duration {duration}s < 60s"
|
|
|
|
# Create output directory for this job prefix
|
|
output_dir = Path(output_base_dir) / job_id_prefix
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate dataframe
|
|
base_filename = f"{job_id}_{timestamp}"
|
|
dataframe_path = output_dir / f"{base_filename}_dataframe"
|
|
|
|
success, df = create_roofline_dataframe(data, meta, cluster_config,
|
|
output_path=str(dataframe_path), save_format=save_format)
|
|
|
|
# Check if dataframe was created successfully
|
|
if success and df is not None:
|
|
return True, f"Processed job {job_id}_{timestamp} (duration: {duration}s, {len(df)} data points)"
|
|
else:
|
|
return False, f"Skipped job {job_id}_{timestamp}: no valid data to create dataframe"
|
|
|
|
except Exception as e:
|
|
return False, f"Error processing job {job_path}: {e}"
|
|
|
|
def process_all_jobs(base_path, output_base_dir, done_file_path="done.txt", num_processes=None, save_format='both'):
|
|
"""Process all jobs in the folder structure using multiprocessing"""
|
|
print(f"Processing jobs from: {base_path}")
|
|
print(f"Output directory: {output_base_dir}")
|
|
print(f"Save format: {save_format}")
|
|
|
|
# Load cluster configuration
|
|
try:
|
|
cluster_config = load_cluster_config_from_folder(base_path)
|
|
print("Loaded cluster configuration")
|
|
except FileNotFoundError as e:
|
|
print(f"Error loading cluster config: {e}")
|
|
return
|
|
|
|
# Load already processed jobs
|
|
done_set = load_done_list(done_file_path)
|
|
print(f"Loaded {len(done_set)} already processed job prefixes from {done_file_path}")
|
|
|
|
# Find all job directories
|
|
job_dirs = find_job_directories(base_path)
|
|
print(f"Found {len(job_dirs)} total job directories")
|
|
|
|
# Filter out already processed jobs
|
|
filtered_job_dirs = []
|
|
for job_path in job_dirs:
|
|
parts = job_path.parts
|
|
if len(parts) >= 3:
|
|
job_id_prefix = parts[-3]
|
|
if job_id_prefix not in done_set:
|
|
filtered_job_dirs.append(job_path)
|
|
|
|
print(f"Found {len(filtered_job_dirs)} unprocessed job directories")
|
|
|
|
if not filtered_job_dirs:
|
|
print("No new jobs to process")
|
|
return
|
|
|
|
# Determine number of processes
|
|
if num_processes is None:
|
|
num_processes = min(12, cpu_count()) # Use up to 12 cores or available cores
|
|
|
|
print(f"Using {num_processes} processes for parallel processing")
|
|
|
|
# Prepare arguments for multiprocessing
|
|
process_args = [(job_path, cluster_config, output_base_dir, save_format) for job_path in filtered_job_dirs]
|
|
|
|
processed_count = 0
|
|
skipped_count = 0
|
|
processed_prefixes = set()
|
|
|
|
# Process jobs in parallel
|
|
if num_processes == 1:
|
|
# Single-threaded processing for debugging
|
|
results = [process_single_job(args) for args in process_args]
|
|
else:
|
|
# Multi-threaded processing
|
|
with Pool(num_processes) as pool:
|
|
results = pool.map(process_single_job, process_args)
|
|
|
|
# Collect results
|
|
for i, (success, message) in enumerate(results):
|
|
print(message)
|
|
if success:
|
|
processed_count += 1
|
|
# Extract job prefix for done tracking
|
|
job_path = filtered_job_dirs[i]
|
|
parts = job_path.parts
|
|
if len(parts) >= 3:
|
|
job_id_prefix = parts[-3]
|
|
processed_prefixes.add(job_id_prefix)
|
|
else:
|
|
skipped_count += 1
|
|
|
|
# Update done list with newly processed prefixes
|
|
if processed_prefixes:
|
|
updated_done_set = done_set | processed_prefixes
|
|
save_done_list(done_file_path, updated_done_set)
|
|
print(f"Updated {done_file_path} with {len(processed_prefixes)} new processed prefixes")
|
|
|
|
print(f"Completed processing {processed_count} jobs, skipped {skipped_count} jobs (duration < 60s or no valid data)")
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Generate roofline dataframes from folder structure for ML training')
|
|
parser.add_argument('--base-path', default=r'D:\fritz',
|
|
help='Path to job archive folder')
|
|
parser.add_argument('--output-dir', default='roofline_dataframes',
|
|
help='Output directory for dataframes')
|
|
parser.add_argument('--done-file', default='done.txt',
|
|
help='File to track processed job prefixes')
|
|
parser.add_argument('--processes', type=int, default=12,
|
|
help='Number of parallel processes to use (default: 12)')
|
|
parser.add_argument('--save-format', default='both',
|
|
choices=['csv', 'pickle', 'both', 'compressed', 'parquet', 'hdf5'],
|
|
help='Format to save dataframes (default: both)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
process_all_jobs(args.base_path, args.output_dir, args.done_file, args.processes, args.save_format)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing jobs: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
if __name__ == "__main__":
|
|
main() |