474 lines
17 KiB
Python
474 lines
17 KiB
Python
"""
|
|
Metrics Collection and Reporting for LLM Benchmarking
|
|
|
|
Provides centralized metrics collection, aggregation, and reporting.
|
|
"""
|
|
|
|
import json
|
|
import csv
|
|
from dataclasses import dataclass, asdict, field
|
|
from typing import Dict, List, Optional, Any
|
|
from pathlib import Path
|
|
import time
|
|
|
|
|
|
@dataclass
|
|
class StageMetrics:
|
|
"""Metrics for a specific stage (e.g., forward pass, prefill, etc.)."""
|
|
stage_name: str
|
|
duration_ms: float
|
|
tokens_processed: int
|
|
tokens_per_second: float
|
|
energy_joules: float
|
|
energy_per_token: float
|
|
avg_power_watts: float
|
|
peak_memory_gb: float
|
|
avg_gpu_util_percent: float
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary."""
|
|
return asdict(self)
|
|
|
|
|
|
@dataclass
|
|
class PretrainMetrics:
|
|
"""Metrics for pretraining benchmark."""
|
|
model_name: str
|
|
gpu_name: str
|
|
attention_implementation: str
|
|
batch_size: int
|
|
sequence_length: int
|
|
num_steps: int
|
|
|
|
# Stage-specific metrics
|
|
forward: StageMetrics
|
|
backward: StageMetrics
|
|
optimizer: StageMetrics
|
|
|
|
# Overall metrics
|
|
total_duration_ms: float
|
|
total_tokens: int
|
|
total_tokens_per_second: float
|
|
total_energy_joules: float
|
|
total_energy_per_token: float
|
|
|
|
timestamp: float = field(default_factory=time.time)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary."""
|
|
return {
|
|
"model_name": self.model_name,
|
|
"gpu_name": self.gpu_name,
|
|
"attention_implementation": self.attention_implementation,
|
|
"batch_size": self.batch_size,
|
|
"sequence_length": self.sequence_length,
|
|
"num_steps": self.num_steps,
|
|
"forward": self.forward.to_dict(),
|
|
"backward": self.backward.to_dict(),
|
|
"optimizer": self.optimizer.to_dict(),
|
|
"total_duration_ms": self.total_duration_ms,
|
|
"total_tokens": self.total_tokens,
|
|
"total_tokens_per_second": self.total_tokens_per_second,
|
|
"total_energy_joules": self.total_energy_joules,
|
|
"total_energy_per_token": self.total_energy_per_token,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class InferenceMetrics:
|
|
"""Metrics for inference benchmark."""
|
|
model_name: str
|
|
gpu_name: str
|
|
attention_implementation: str
|
|
num_requests: int
|
|
prompt_length: int
|
|
generation_length: int
|
|
|
|
# Stage-specific metrics
|
|
prefill: StageMetrics # Time to First Token
|
|
decode: StageMetrics # Inter-Token Latency
|
|
|
|
# End-to-end metrics
|
|
e2e_latency_ms: float
|
|
e2e_tokens_per_second: float
|
|
e2e_energy_joules: float
|
|
e2e_energy_per_token: float
|
|
|
|
# Additional metrics
|
|
ttft_ms: float # Time to First Token (same as prefill duration)
|
|
itl_ms: float # Inter-Token Latency (decode duration / num_tokens)
|
|
|
|
timestamp: float = field(default_factory=time.time)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary."""
|
|
return {
|
|
"model_name": self.model_name,
|
|
"gpu_name": self.gpu_name,
|
|
"attention_implementation": self.attention_implementation,
|
|
"num_requests": self.num_requests,
|
|
"prompt_length": self.prompt_length,
|
|
"generation_length": self.generation_length,
|
|
"prefill": self.prefill.to_dict(),
|
|
"decode": self.decode.to_dict(),
|
|
"e2e_latency_ms": self.e2e_latency_ms,
|
|
"e2e_tokens_per_second": self.e2e_tokens_per_second,
|
|
"e2e_energy_joules": self.e2e_energy_joules,
|
|
"e2e_energy_per_token": self.e2e_energy_per_token,
|
|
"ttft_ms": self.ttft_ms,
|
|
"itl_ms": self.itl_ms,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
|
|
|
|
class MetricsCollector:
|
|
"""Collects metrics during benchmark runs."""
|
|
|
|
def __init__(self):
|
|
"""Initialize metrics collector."""
|
|
self.metrics_history: List[Dict[str, Any]] = []
|
|
|
|
def add_pretrain_metrics(self, metrics: PretrainMetrics):
|
|
"""Add pretraining metrics."""
|
|
self.metrics_history.append({
|
|
"type": "pretrain",
|
|
"metrics": metrics.to_dict()
|
|
})
|
|
|
|
def add_inference_metrics(self, metrics: InferenceMetrics):
|
|
"""Add inference metrics."""
|
|
self.metrics_history.append({
|
|
"type": "inference",
|
|
"metrics": metrics.to_dict()
|
|
})
|
|
|
|
def get_all_metrics(self) -> List[Dict[str, Any]]:
|
|
"""Get all collected metrics."""
|
|
return self.metrics_history
|
|
|
|
def clear(self):
|
|
"""Clear all metrics."""
|
|
self.metrics_history.clear()
|
|
|
|
|
|
class MetricsReporter:
|
|
"""Formats and outputs benchmark results."""
|
|
|
|
@staticmethod
|
|
def print_pretrain_metrics(metrics: PretrainMetrics, verbose: bool = True):
|
|
"""Print pretraining metrics to console."""
|
|
print("\n" + "=" * 80)
|
|
print("PRETRAINING BENCHMARK RESULTS")
|
|
print("=" * 80)
|
|
print(f"\nModel: {metrics.model_name}")
|
|
print(f"GPU: {metrics.gpu_name}")
|
|
print(f"Attention: {metrics.attention_implementation}")
|
|
print(f"Batch Size: {metrics.batch_size}")
|
|
print(f"Sequence Length: {metrics.sequence_length}")
|
|
print(f"Training Steps: {metrics.num_steps}")
|
|
|
|
print("\n" + "-" * 80)
|
|
print("STAGE BREAKDOWN")
|
|
print("-" * 80)
|
|
|
|
# Forward pass
|
|
print(f"\n[1] FORWARD PASS")
|
|
MetricsReporter._print_stage_metrics(metrics.forward, verbose)
|
|
|
|
# Backward pass
|
|
print(f"\n[2] BACKWARD PASS")
|
|
MetricsReporter._print_stage_metrics(metrics.backward, verbose)
|
|
|
|
# Optimizer step
|
|
print(f"\n[3] OPTIMIZER STEP")
|
|
MetricsReporter._print_stage_metrics(metrics.optimizer, verbose)
|
|
|
|
# Overall
|
|
print("\n" + "-" * 80)
|
|
print("OVERALL METRICS")
|
|
print("-" * 80)
|
|
print(f" Total Duration: {metrics.total_duration_ms:>10.2f} ms")
|
|
print(f" Total Tokens: {metrics.total_tokens:>10,}")
|
|
print(f" Throughput: {metrics.total_tokens_per_second:>10.2f} tokens/s")
|
|
print(f" Total Energy: {metrics.total_energy_joules:>10.2f} J")
|
|
print(f" Energy per Token: {metrics.total_energy_per_token*1000:>10.4f} mJ/token")
|
|
print("=" * 80 + "\n")
|
|
|
|
@staticmethod
|
|
def print_inference_metrics(metrics: InferenceMetrics, verbose: bool = True):
|
|
"""Print inference metrics to console."""
|
|
print("\n" + "=" * 80)
|
|
print("INFERENCE BENCHMARK RESULTS")
|
|
print("=" * 80)
|
|
print(f"\nModel: {metrics.model_name}")
|
|
print(f"GPU: {metrics.gpu_name}")
|
|
print(f"Attention: {metrics.attention_implementation}")
|
|
print(f"Requests: {metrics.num_requests}")
|
|
print(f"Prompt Length: {metrics.prompt_length}")
|
|
print(f"Generation Length: {metrics.generation_length}")
|
|
|
|
print("\n" + "-" * 80)
|
|
print("STAGE BREAKDOWN")
|
|
print("-" * 80)
|
|
|
|
# Prefill
|
|
print(f"\n[1] PREFILL (Time to First Token)")
|
|
MetricsReporter._print_stage_metrics(metrics.prefill, verbose)
|
|
print(f" TTFT: {metrics.ttft_ms:>10.2f} ms")
|
|
|
|
# Decode
|
|
print(f"\n[2] DECODE (Inter-Token Latency)")
|
|
MetricsReporter._print_stage_metrics(metrics.decode, verbose)
|
|
print(f" ITL: {metrics.itl_ms:>10.2f} ms/token")
|
|
|
|
# End-to-end
|
|
print("\n" + "-" * 80)
|
|
print("END-TO-END METRICS")
|
|
print("-" * 80)
|
|
print(f" Request Latency: {metrics.e2e_latency_ms:>10.2f} ms")
|
|
print(f" Throughput: {metrics.e2e_tokens_per_second:>10.2f} tokens/s")
|
|
print(f" Total Energy: {metrics.e2e_energy_joules:>10.2f} J")
|
|
print(f" Energy per Token: {metrics.e2e_energy_per_token*1000:>10.4f} mJ/token")
|
|
print("=" * 80 + "\n")
|
|
|
|
@staticmethod
|
|
def _print_stage_metrics(stage: StageMetrics, verbose: bool = True):
|
|
"""Print metrics for a single stage."""
|
|
print(f" Duration: {stage.duration_ms:>10.2f} ms")
|
|
print(f" Tokens: {stage.tokens_processed:>10,}")
|
|
print(f" Throughput: {stage.tokens_per_second:>10.2f} tokens/s")
|
|
print(f" Energy: {stage.energy_joules:>10.2f} J")
|
|
print(f" Energy per Token: {stage.energy_per_token*1000:>10.4f} mJ/token")
|
|
|
|
if verbose:
|
|
print(f" Avg Power: {stage.avg_power_watts:>10.2f} W")
|
|
print(f" Peak Memory: {stage.peak_memory_gb:>10.2f} GB")
|
|
print(f" Avg GPU Utilization: {stage.avg_gpu_util_percent:>10.1f} %")
|
|
|
|
@staticmethod
|
|
def save_json(metrics: Any, output_path: Path):
|
|
"""
|
|
Save metrics to JSON file.
|
|
|
|
Args:
|
|
metrics: PretrainMetrics or InferenceMetrics object
|
|
output_path: Path to output JSON file
|
|
"""
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(output_path, 'w') as f:
|
|
json.dump(metrics.to_dict(), f, indent=2)
|
|
|
|
print(f"Metrics saved to: {output_path}")
|
|
|
|
@staticmethod
|
|
def save_csv(metrics_list: List[Any], output_path: Path, benchmark_type: str = "pretrain"):
|
|
"""
|
|
Save multiple metrics to CSV file for comparison.
|
|
|
|
Args:
|
|
metrics_list: List of PretrainMetrics or InferenceMetrics objects
|
|
output_path: Path to output CSV file
|
|
benchmark_type: "pretrain" or "inference"
|
|
"""
|
|
if not metrics_list:
|
|
print("No metrics to save")
|
|
return
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(output_path, 'w', newline='') as f:
|
|
if benchmark_type == "pretrain":
|
|
MetricsReporter._save_pretrain_csv(metrics_list, f)
|
|
else:
|
|
MetricsReporter._save_inference_csv(metrics_list, f)
|
|
|
|
print(f"CSV saved to: {output_path}")
|
|
|
|
@staticmethod
|
|
def _save_pretrain_csv(metrics_list: List[PretrainMetrics], file):
|
|
"""Save pretraining metrics to CSV."""
|
|
fieldnames = [
|
|
'gpu_name', 'attention_implementation', 'batch_size', 'sequence_length', 'num_steps',
|
|
'forward_duration_ms', 'forward_tokens_per_sec', 'forward_energy_j', 'forward_energy_per_token_mj',
|
|
'backward_duration_ms', 'backward_tokens_per_sec', 'backward_energy_j', 'backward_energy_per_token_mj',
|
|
'optimizer_duration_ms', 'optimizer_tokens_per_sec', 'optimizer_energy_j', 'optimizer_energy_per_token_mj',
|
|
'total_duration_ms', 'total_tokens_per_sec', 'total_energy_j', 'total_energy_per_token_mj',
|
|
'timestamp'
|
|
]
|
|
|
|
writer = csv.DictWriter(file, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
|
|
for m in metrics_list:
|
|
writer.writerow({
|
|
'gpu_name': m.gpu_name,
|
|
'attention_implementation': m.attention_implementation,
|
|
'batch_size': m.batch_size,
|
|
'sequence_length': m.sequence_length,
|
|
'num_steps': m.num_steps,
|
|
'forward_duration_ms': m.forward.duration_ms,
|
|
'forward_tokens_per_sec': m.forward.tokens_per_second,
|
|
'forward_energy_j': m.forward.energy_joules,
|
|
'forward_energy_per_token_mj': m.forward.energy_per_token * 1000,
|
|
'backward_duration_ms': m.backward.duration_ms,
|
|
'backward_tokens_per_sec': m.backward.tokens_per_second,
|
|
'backward_energy_j': m.backward.energy_joules,
|
|
'backward_energy_per_token_mj': m.backward.energy_per_token * 1000,
|
|
'optimizer_duration_ms': m.optimizer.duration_ms,
|
|
'optimizer_tokens_per_sec': m.optimizer.tokens_per_second,
|
|
'optimizer_energy_j': m.optimizer.energy_joules,
|
|
'optimizer_energy_per_token_mj': m.optimizer.energy_per_token * 1000,
|
|
'total_duration_ms': m.total_duration_ms,
|
|
'total_tokens_per_sec': m.total_tokens_per_second,
|
|
'total_energy_j': m.total_energy_joules,
|
|
'total_energy_per_token_mj': m.total_energy_per_token * 1000,
|
|
'timestamp': m.timestamp,
|
|
})
|
|
|
|
@staticmethod
|
|
def _save_inference_csv(metrics_list: List[InferenceMetrics], file):
|
|
"""Save inference metrics to CSV."""
|
|
fieldnames = [
|
|
'gpu_name', 'attention_implementation', 'num_requests', 'prompt_length', 'generation_length',
|
|
'prefill_duration_ms', 'prefill_tokens_per_sec', 'prefill_energy_j', 'prefill_energy_per_token_mj',
|
|
'ttft_ms',
|
|
'decode_duration_ms', 'decode_tokens_per_sec', 'decode_energy_j', 'decode_energy_per_token_mj',
|
|
'itl_ms',
|
|
'e2e_latency_ms', 'e2e_tokens_per_sec', 'e2e_energy_j', 'e2e_energy_per_token_mj',
|
|
'timestamp'
|
|
]
|
|
|
|
writer = csv.DictWriter(file, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
|
|
for m in metrics_list:
|
|
writer.writerow({
|
|
'gpu_name': m.gpu_name,
|
|
'attention_implementation': m.attention_implementation,
|
|
'num_requests': m.num_requests,
|
|
'prompt_length': m.prompt_length,
|
|
'generation_length': m.generation_length,
|
|
'prefill_duration_ms': m.prefill.duration_ms,
|
|
'prefill_tokens_per_sec': m.prefill.tokens_per_second,
|
|
'prefill_energy_j': m.prefill.energy_joules,
|
|
'prefill_energy_per_token_mj': m.prefill.energy_per_token * 1000,
|
|
'ttft_ms': m.ttft_ms,
|
|
'decode_duration_ms': m.decode.duration_ms,
|
|
'decode_tokens_per_sec': m.decode.tokens_per_second,
|
|
'decode_energy_j': m.decode.energy_joules,
|
|
'decode_energy_per_token_mj': m.decode.energy_per_token * 1000,
|
|
'itl_ms': m.itl_ms,
|
|
'e2e_latency_ms': m.e2e_latency_ms,
|
|
'e2e_tokens_per_sec': m.e2e_tokens_per_second,
|
|
'e2e_energy_j': m.e2e_energy_joules,
|
|
'e2e_energy_per_token_mj': m.e2e_energy_per_token * 1000,
|
|
'timestamp': m.timestamp,
|
|
})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
"""Test metrics reporting."""
|
|
# Create sample pretraining metrics
|
|
forward = StageMetrics(
|
|
stage_name="forward",
|
|
duration_ms=100.5,
|
|
tokens_processed=1024,
|
|
tokens_per_second=10189.3,
|
|
energy_joules=25.3,
|
|
energy_per_token=0.0247,
|
|
avg_power_watts=251.7,
|
|
peak_memory_gb=45.2,
|
|
avg_gpu_util_percent=95.3
|
|
)
|
|
|
|
backward = StageMetrics(
|
|
stage_name="backward",
|
|
duration_ms=205.2,
|
|
tokens_processed=1024,
|
|
tokens_per_second=4991.2,
|
|
energy_joules=51.6,
|
|
energy_per_token=0.0504,
|
|
avg_power_watts=251.5,
|
|
peak_memory_gb=48.6,
|
|
avg_gpu_util_percent=97.1
|
|
)
|
|
|
|
optimizer = StageMetrics(
|
|
stage_name="optimizer",
|
|
duration_ms=15.3,
|
|
tokens_processed=1024,
|
|
tokens_per_second=66928.1,
|
|
energy_joules=3.8,
|
|
energy_per_token=0.0037,
|
|
avg_power_watts=248.4,
|
|
peak_memory_gb=48.6,
|
|
avg_gpu_util_percent=42.1
|
|
)
|
|
|
|
pretrain_metrics = PretrainMetrics(
|
|
model_name="Qwen/Qwen2.5-3B-Instruct",
|
|
gpu_name="NVIDIA A100 80GB",
|
|
attention_implementation="flash_attention_2",
|
|
batch_size=8,
|
|
sequence_length=2048,
|
|
num_steps=10,
|
|
forward=forward,
|
|
backward=backward,
|
|
optimizer=optimizer,
|
|
total_duration_ms=321.0,
|
|
total_tokens=10240,
|
|
total_tokens_per_second=31900.3,
|
|
total_energy_joules=80.7,
|
|
total_energy_per_token=0.00788
|
|
)
|
|
|
|
# Print pretrain metrics
|
|
MetricsReporter.print_pretrain_metrics(pretrain_metrics)
|
|
|
|
# Create sample inference metrics
|
|
prefill = StageMetrics(
|
|
stage_name="prefill",
|
|
duration_ms=45.2,
|
|
tokens_processed=512,
|
|
tokens_per_second=11327.4,
|
|
energy_joules=11.3,
|
|
energy_per_token=0.0221,
|
|
avg_power_watts=250.0,
|
|
peak_memory_gb=42.1,
|
|
avg_gpu_util_percent=89.2
|
|
)
|
|
|
|
decode = StageMetrics(
|
|
stage_name="decode",
|
|
duration_ms=223.5,
|
|
tokens_processed=100,
|
|
tokens_per_second=447.4,
|
|
energy_joules=55.9,
|
|
energy_per_token=0.559,
|
|
avg_power_watts=250.1,
|
|
peak_memory_gb=42.1,
|
|
avg_gpu_util_percent=62.3
|
|
)
|
|
|
|
inference_metrics = InferenceMetrics(
|
|
model_name="Qwen/Qwen2.5-3B-Instruct",
|
|
gpu_name="NVIDIA A100 80GB",
|
|
attention_implementation="flash_attention_2",
|
|
num_requests=10,
|
|
prompt_length=512,
|
|
generation_length=100,
|
|
prefill=prefill,
|
|
decode=decode,
|
|
e2e_latency_ms=268.7,
|
|
e2e_tokens_per_second=2277.9,
|
|
e2e_energy_joules=67.2,
|
|
e2e_energy_per_token=0.110,
|
|
ttft_ms=45.2,
|
|
itl_ms=2.235
|
|
)
|
|
|
|
# Print inference metrics
|
|
MetricsReporter.print_inference_metrics(inference_metrics)
|