#!/usr/bin/env python3 """ Pretraining Benchmark for LLM Performance Evaluation Measures performance and energy metrics for pretraining workloads with separate measurements for forward, backward, and optimizer stages. """ import argparse import os import sys import time from pathlib import Path from typing import Optional import torch import torch.nn as nn from transformers import AutoModelForCausalLM, AutoTokenizer from tqdm import tqdm # Add utils to path sys.path.insert(0, str(Path(__file__).parent)) from utils.gpu_monitor import get_gpu_monitor from utils.metrics import StageMetrics, PretrainMetrics, MetricsReporter from utils.attention import get_default_attention, configure_model_attention, validate_attention_for_gpu def benchmark_pretrain( model_name_or_path: str, attn_implementation: str = "auto", batch_size: int = 8, sequence_length: int = 2048, num_steps: int = 10, warmup_steps: int = 3, device: str = "cuda", device_id: int = 0, output_dir: Optional[str] = None, verbose: bool = True, ): """ Run pretraining benchmark. Args: model_name_or_path: Path to model or HuggingFace identifier attn_implementation: Attention implementation to use batch_size: Batch size for training sequence_length: Sequence length num_steps: Number of training steps to measure warmup_steps: Number of warmup steps before measurement device: Device to use device_id: GPU device ID output_dir: Directory to save results verbose: Print verbose output """ print("=" * 80) print("PRETRAINING BENCHMARK") print("=" * 80) # Initialize GPU monitor if verbose: print("\n[1/6] Initializing GPU monitor...") monitor = get_gpu_monitor(device_id) gpu_name = monitor.get_device_name() if verbose: print(f" GPU: {gpu_name}") # Determine attention implementation if attn_implementation == "auto": attn_implementation = get_default_attention(gpu_name) if verbose: print(f" Auto-selected attention: {attn_implementation}") # Validate attention for GPU valid, warning = validate_attention_for_gpu(attn_implementation, gpu_name) if warning and verbose: print(f" ⚠ {warning}") # Load model if verbose: print(f"\n[2/6] Loading model: {model_name_or_path}") # Determine attn_implementation parameter for model loading load_attn = "flash_attention_2" if attn_implementation in ["flash_attention_2", "flash_attention_3_hopper"] else attn_implementation try: model = AutoModelForCausalLM.from_pretrained( model_name_or_path, torch_dtype=torch.bfloat16, attn_implementation=load_attn, trust_remote_code=True, ) model = model.to(device) # Configure attention (patch if needed for FA3) model = configure_model_attention(model, attn_implementation, verbose=verbose) if verbose: total_params = sum(p.numel() for p in model.parameters()) trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) print(f" Total parameters: {total_params:,} ({total_params/1e9:.2f}B)") print(f" Trainable parameters: {trainable_params:,}") except Exception as e: print(f"✗ Error loading model: {e}") sys.exit(1) # Setup optimizer if verbose: print(f"\n[3/6] Setting up optimizer...") optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4) # Generate synthetic training data if verbose: print(f"\n[4/6] Generating synthetic training data...") print(f" Batch size: {batch_size}") print(f" Sequence length: {sequence_length}") # Create random input_ids (synthetic data) vocab_size = model.config.vocab_size input_ids = torch.randint(0, vocab_size, (batch_size, sequence_length), device=device) labels = input_ids.clone() # Warmup if verbose: print(f"\n[5/6] Running warmup ({warmup_steps} steps)...") model.train() for _ in range(warmup_steps): optimizer.zero_grad() outputs = model(input_ids=input_ids, labels=labels) loss = outputs.loss loss.backward() optimizer.step() # Synchronize before benchmarking torch.cuda.synchronize() # Benchmark if verbose: print(f"\n[6/6] Running benchmark ({num_steps} steps)...") # Storage for per-step metrics forward_times = [] backward_times = [] optimizer_times = [] forward_energies = [] backward_energies = [] optimizer_energies = [] forward_powers = [] backward_powers = [] optimizer_powers = [] memory_usage = [] gpu_utils = [] total_tokens = batch_size * sequence_length * num_steps for step in tqdm(range(num_steps), desc="Benchmarking"): # === FORWARD PASS === monitor.start_monitoring() torch.cuda.synchronize() start_time = time.perf_counter() optimizer.zero_grad() outputs = model(input_ids=input_ids, labels=labels) loss = outputs.loss torch.cuda.synchronize() forward_time = time.perf_counter() - start_time forward_energy = monitor.get_energy_consumed() forward_power = monitor.get_average_power() forward_times.append(forward_time * 1000) # Convert to ms forward_energies.append(forward_energy) forward_powers.append(forward_power) # === BACKWARD PASS === monitor.start_monitoring() torch.cuda.synchronize() start_time = time.perf_counter() loss.backward() torch.cuda.synchronize() backward_time = time.perf_counter() - start_time backward_energy = monitor.get_energy_consumed() backward_power = monitor.get_average_power() backward_times.append(backward_time * 1000) # Convert to ms backward_energies.append(backward_energy) backward_powers.append(backward_power) # === OPTIMIZER STEP === monitor.start_monitoring() torch.cuda.synchronize() start_time = time.perf_counter() optimizer.step() torch.cuda.synchronize() optimizer_time = time.perf_counter() - start_time optimizer_energy = monitor.get_energy_consumed() optimizer_power = monitor.get_average_power() optimizer_times.append(optimizer_time * 1000) # Convert to ms optimizer_energies.append(optimizer_energy) optimizer_powers.append(optimizer_power) # Get memory and utilization metrics = monitor.get_metrics() memory_usage.append(metrics.memory_used_gb) gpu_utils.append(metrics.gpu_utilization_percent) # Compute aggregated metrics tokens_per_step = batch_size * sequence_length # Forward metrics forward_duration_ms = sum(forward_times) forward_energy_j = sum(forward_energies) forward_tokens = tokens_per_step * num_steps forward_tps = forward_tokens / (forward_duration_ms / 1000) forward_ept = forward_energy_j / forward_tokens forward_metrics = StageMetrics( stage_name="forward", duration_ms=forward_duration_ms, tokens_processed=forward_tokens, tokens_per_second=forward_tps, energy_joules=forward_energy_j, energy_per_token=forward_ept, avg_power_watts=sum(forward_powers) / len(forward_powers), peak_memory_gb=max(memory_usage), avg_gpu_util_percent=sum(gpu_utils) / len(gpu_utils) ) # Backward metrics backward_duration_ms = sum(backward_times) backward_energy_j = sum(backward_energies) backward_tokens = tokens_per_step * num_steps backward_tps = backward_tokens / (backward_duration_ms / 1000) backward_ept = backward_energy_j / backward_tokens backward_metrics = StageMetrics( stage_name="backward", duration_ms=backward_duration_ms, tokens_processed=backward_tokens, tokens_per_second=backward_tps, energy_joules=backward_energy_j, energy_per_token=backward_ept, avg_power_watts=sum(backward_powers) / len(backward_powers), peak_memory_gb=max(memory_usage), avg_gpu_util_percent=sum(gpu_utils) / len(gpu_utils) ) # Optimizer metrics optimizer_duration_ms = sum(optimizer_times) optimizer_energy_j = sum(optimizer_energies) optimizer_tokens = tokens_per_step * num_steps optimizer_tps = optimizer_tokens / (optimizer_duration_ms / 1000) optimizer_ept = optimizer_energy_j / optimizer_tokens optimizer_metrics = StageMetrics( stage_name="optimizer", duration_ms=optimizer_duration_ms, tokens_processed=optimizer_tokens, tokens_per_second=optimizer_tps, energy_joules=optimizer_energy_j, energy_per_token=optimizer_ept, avg_power_watts=sum(optimizer_powers) / len(optimizer_powers), peak_memory_gb=max(memory_usage), avg_gpu_util_percent=sum(gpu_utils) / len(gpu_utils) ) # Overall metrics total_duration_ms = forward_duration_ms + backward_duration_ms + optimizer_duration_ms total_energy_j = forward_energy_j + backward_energy_j + optimizer_energy_j total_tps = total_tokens / (total_duration_ms / 1000) total_ept = total_energy_j / total_tokens # Create metrics object metrics = PretrainMetrics( model_name=model_name_or_path, gpu_name=gpu_name, attention_implementation=attn_implementation, batch_size=batch_size, sequence_length=sequence_length, num_steps=num_steps, forward=forward_metrics, backward=backward_metrics, optimizer=optimizer_metrics, total_duration_ms=total_duration_ms, total_tokens=total_tokens, total_tokens_per_second=total_tps, total_energy_joules=total_energy_j, total_energy_per_token=total_ept ) # Print results MetricsReporter.print_pretrain_metrics(metrics, verbose=verbose) # Save results if output_dir: output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # Save JSON json_path = output_path / f"pretrain_{gpu_name.replace(' ', '_')}_{attn_implementation}.json" MetricsReporter.save_json(metrics, json_path) # Cleanup monitor.cleanup() del model torch.cuda.empty_cache() return metrics def main(): parser = argparse.ArgumentParser( description="LLM Pretraining Benchmark", formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( "--model-path", type=str, default="./model_cache", help="Path to cached model" ) parser.add_argument( "--model-name", type=str, default="Qwen/Qwen3-4B", help="Model name (for reporting)" ) parser.add_argument( "--attn-implementation", type=str, default="auto", choices=["auto", "flash_attention_2", "flash_attention_3_hopper", "sdpa", "eager"], help="Attention implementation to use" ) parser.add_argument( "--batch-size", type=int, default=8, help="Batch size" ) parser.add_argument( "--sequence-length", type=int, default=8192, help="Sequence length" ) parser.add_argument( "--num-steps", type=int, default=10, help="Number of training steps" ) parser.add_argument( "--warmup-steps", type=int, default=3, help="Number of warmup steps" ) parser.add_argument( "--device-id", type=int, default=0, help="GPU device ID" ) parser.add_argument( "--output-dir", type=str, default="./results", help="Output directory for results" ) args = parser.parse_args() # Set environment variables for HuggingFace cache if Path(args.model_path).exists(): os.environ['HF_HOME'] = args.model_path benchmark_pretrain( model_name_or_path=args.model_name, attn_implementation=args.attn_implementation, batch_size=args.batch_size, sequence_length=args.sequence_length, num_steps=args.num_steps, warmup_steps=args.warmup_steps, device="cuda", device_id=args.device_id, output_dir=args.output_dir, verbose=True ) if __name__ == "__main__": main()