Files
slurm-application-detection…/xgb_local.py
2025-12-10 12:17:41 +01:00

530 lines
20 KiB
Python

import pandas as pd
import numpy as np
import joblib
import json
from typing import Dict, List, Tuple, Union, Optional
import warnings
warnings.filterwarnings('ignore')
from scipy import stats
def compute_mad(data: np.ndarray) -> float:
"""Compute Median Absolute Deviation."""
median = np.median(data)
mad = np.median(np.abs(data - median))
return mad
def df_aggregate(json_str: str, job_id_full: Optional[str] = None) -> Dict:
"""
Aggregate roofline data from JSON string into a single feature vector.
Args:
json_str: JSON string containing roofline data records
job_id_full: Optional job ID to include in the result
Returns:
Dictionary containing aggregated features
"""
# Parse JSON string to DataFrame
try:
data = json.loads(json_str)
if isinstance(data, list):
df = pd.DataFrame(data)
elif isinstance(data, dict):
df = pd.DataFrame([data])
else:
raise ValueError("JSON must contain a list of objects or a single object")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON string: {e}")
# Group data by node_num
if 'node_num' not in df.columns:
# If no node_num, treat all data as single node
df['node_num'] = 1
grouped = df.groupby('node_num')
all_features = []
for node_num, group in grouped:
features = {
'node_num': int(node_num)
}
if job_id_full is not None:
features['job_id'] = job_id_full
# Compute statistics for key metrics
for axis in ['bandwidth_raw', 'flops_raw', 'arith_intensity']:
data = group[axis].values
# Compute percentiles
p10 = np.percentile(data, 10)
p50 = np.median(data)
p90 = np.percentile(data, 90)
# Compute MAD (more robust than variance)
mad = compute_mad(data)
# Store features
features[f'{axis}_p10'] = p10
features[f'{axis}_median'] = p50
features[f'{axis}_p90'] = p90
features[f'{axis}_mad'] = mad
features[f'{axis}_range'] = p90 - p10
features[f'{axis}_iqr'] = np.percentile(data, 75) - np.percentile(data, 25)
# Compute covariance and correlation between bandwidth_raw and flops_raw
if len(group) > 1: # Need at least 2 points for correlation
cov = np.cov(group['bandwidth_raw'], group['flops_raw'])[0, 1]
features['bw_flops_covariance'] = cov
corr, _ = stats.pearsonr(group['bandwidth_raw'], group['flops_raw'])
features['bw_flops_correlation'] = corr
# Additional useful features for the classifier
# Performance metrics
features['avg_performance_gflops'] = group['performance_gflops'].mean()
features['median_performance_gflops'] = group['performance_gflops'].median()
features['performance_gflops_mad'] = compute_mad(group['performance_gflops'].values)
# # Efficiency metrics
# features['avg_efficiency'] = group['efficiency'].mean()
# features['median_efficiency'] = group['efficiency'].median()
# features['efficiency_mad'] = compute_mad(group['efficiency'].values)
# features['efficiency_p10'] = np.percentile(group['efficiency'].values, 10)
# features['efficiency_p90'] = np.percentile(group['efficiency'].values, 90)
# # Distribution of roofline regions (memory-bound vs compute-bound)
# if 'roofline_region' in group.columns:
# region_counts = group['roofline_region'].value_counts(normalize=True).to_dict()
# for region, ratio in region_counts.items():
# features[f'region_{region}_ratio'] = ratio
# System characteristics
if 'memory_bw_gbs' in group.columns:
features['avg_memory_bw_gbs'] = group['memory_bw_gbs'].mean()
if 'scalar_peak_gflops' in group.columns and len(group['scalar_peak_gflops'].unique()) > 0:
features['scalar_peak_gflops'] = group['scalar_peak_gflops'].iloc[0]
if 'simd_peak_gflops' in group.columns and len(group['simd_peak_gflops'].unique()) > 0:
features['simd_peak_gflops'] = group['simd_peak_gflops'].iloc[0]
# # Subcluster information if available
# if 'subcluster_name' in group.columns and not group['subcluster_name'].isna().all():
# features['subcluster_name'] = group['subcluster_name'].iloc[0]
# Duration information
if 'duration' in group.columns:
features['duration'] = group['duration'].iloc[0]
all_features.append(features)
# Return first node's features (or combine multiple nodes if needed)
if len(all_features) == 1:
return all_features[0]
else:
# If multiple nodes, return the first one or average across nodes
# For now, return the first node's features
return all_features[0]
class XGBoostMultiLabelPredictor:
"""
Python API for XGBoost multi-label classification inference.
Provides methods to load trained models band perform inference with
confidence scores for each class.
"""
def __init__(self, model_path: str = 'xgb_model.joblib'):
"""
Initialize the predictor by loading the trained model.
Args:
model_path: Path to the saved model file (.joblib)
"""
self.model_data = None
self.model = None
self.mlb = None
self.feature_columns = None
self.n_features = 0
self.classes = []
self.load_model(model_path)
def load_model(self, model_path: str) -> None:
"""
Load the trained XGBoost model from disk.
Args:
model_path: Path to the saved model file
"""
try:
print(f"Loading model from {model_path}...")
self.model_data = joblib.load(model_path)
self.model = self.model_data['model']
self.mlb = self.model_data['mlb']
self.feature_columns = self.model_data['feature_columns']
self.classes = list(self.mlb.classes_)
self.n_features = len(self.feature_columns)
print("Model loaded successfully!")
print(f" - {len(self.classes)} classes: {self.classes}")
print(f" - {self.n_features} features: {self.feature_columns[:5]}...")
print(f" - Model type: {type(self.model).__name__}")
except Exception as e:
raise ValueError(f"Failed to load model from {model_path}: {e}")
def predict(self, features: Union[pd.DataFrame, np.ndarray, List, Dict, str],
threshold: float = 0.5,
return_all_probabilities: bool = True,
is_json: bool = False,
job_id: Optional[str] = None) -> Dict:
"""
Perform multi-label prediction on input features.
Args:
features: Input features in various formats:
- pandas DataFrame
- numpy array (2D)
- list of lists/dicts
- single feature vector (list/dict)
- JSON string (if is_json=True): roofline data to aggregate
threshold: Probability threshold for binary classification (0.0-1.0)
return_all_probabilities: If True, return probabilities for all classes.
If False, return only classes above threshold.
is_json: If True, treat features as JSON string of roofline data
job_id: Optional job ID (used when is_json=True)
Returns:
Dictionary containing:
- 'predictions': List of predicted class names
- 'probabilities': Dict of {class_name: probability} for all classes
- 'confidences': Dict of {class_name: confidence_score} for predicted classes
- 'threshold': The threshold used
"""
# If input is JSON string, aggregate features first
if is_json:
if not isinstance(features, str):
raise ValueError("When is_json=True, features must be a JSON string")
features = df_aggregate(features, job_id_full=job_id)
# Convert input to proper format
X = self._prepare_features(features)
# Get probability predictions
probabilities = self.model.predict_proba(X)
# Convert to class probabilities
class_probabilities = {}
for i, class_name in enumerate(self.classes):
# For OneVsRest, predict_proba returns shape (n_samples, n_classes)
# Each column i contains probabilities for class i
if isinstance(probabilities, list):
# List of arrays (multiple samples)
prob_array = probabilities[i]
prob_positive = prob_array[0] if hasattr(prob_array, '__getitem__') else float(prob_array)
else:
# 2D numpy array (single sample or batch)
if len(probabilities.shape) == 2:
# Shape: (n_samples, n_classes)
prob_positive = float(probabilities[0, i])
else:
# 1D array
prob_positive = float(probabilities[i])
class_probabilities[class_name] = prob_positive
# Apply threshold for predictions
predictions = []
confidences = {}
for class_name, prob in class_probabilities.items():
if prob >= threshold:
predictions.append(class_name)
# Confidence score: distance from threshold as percentage
confidence = min(1.0, (prob - threshold) / (1.0 - threshold)) * 100
confidences[class_name] = round(confidence, 2)
# Sort predictions by probability
predictions.sort(key=lambda x: class_probabilities[x], reverse=True)
result = {
'predictions': predictions,
'probabilities': {k: round(v, 4) for k, v in class_probabilities.items()},
'confidences': confidences,
'threshold': threshold
}
if not return_all_probabilities:
result['probabilities'] = {k: v for k, v in result['probabilities'].items()
if k in predictions}
return result
def predict_top_k(self, features: Union[pd.DataFrame, np.ndarray, List, Dict, str],
k: int = 5,
is_json: bool = False,
job_id: Optional[str] = None) -> Dict:
"""
Get top-k predictions with their probabilities.
Args:
features: Input features (various formats) or JSON string if is_json=True
k: Number of top predictions to return
is_json: If True, treat features as JSON string of roofline data
job_id: Optional job ID (used when is_json=True)
Returns:
Dictionary with top-k predictions and their details
"""
# If input is JSON string, aggregate features first
if is_json:
if not isinstance(features, str):
raise ValueError("When is_json=True, features must be a JSON string")
features = df_aggregate(features, job_id_full=job_id)
# Get all probabilities
X = self._prepare_features(features)
probabilities = self.model.predict_proba(X)
class_probabilities = {}
for i, class_name in enumerate(self.classes):
# For OneVsRest, predict_proba returns shape (n_samples, n_classes)
# Each column i contains probabilities for class i
if isinstance(probabilities, list):
# List of arrays (multiple samples)
prob_array = probabilities[i]
prob_positive = prob_array[0] if hasattr(prob_array, '__getitem__') else float(prob_array)
else:
# 2D numpy array (single sample or batch)
if len(probabilities.shape) == 2:
# Shape: (n_samples, n_classes)
prob_positive = float(probabilities[0, i])
else:
# 1D array
prob_positive = float(probabilities[i])
class_probabilities[class_name] = prob_positive
# Sort by probability
sorted_classes = sorted(class_probabilities.items(),
key=lambda x: x[1], reverse=True)
top_k_classes = sorted_classes[:k]
return {
'top_predictions': [cls for cls, _ in top_k_classes],
'top_probabilities': {cls: round(prob, 4) for cls, prob in top_k_classes},
'all_probabilities': {k: round(v, 4) for k, v in class_probabilities.items()}
}
def _prepare_features(self, features: Union[pd.DataFrame, np.ndarray, List, Dict]) -> pd.DataFrame:
"""
Convert various input formats to the expected feature format.
Args:
features: Input features in various formats
Returns:
pandas DataFrame with correct columns and order
"""
if isinstance(features, pd.DataFrame):
df = features.copy()
elif isinstance(features, np.ndarray):
if features.ndim == 1:
features = features.reshape(1, -1)
df = pd.DataFrame(features, columns=self.feature_columns[:features.shape[1]])
elif isinstance(features, list):
if isinstance(features[0], dict):
# List of dictionaries
df = pd.DataFrame(features)
else:
# List of lists
df = pd.DataFrame(features, columns=self.feature_columns[:len(features[0])])
elif isinstance(features, dict):
# Single feature dictionary
df = pd.DataFrame([features])
else:
raise ValueError(f"Unsupported feature format: {type(features)}")
# Ensure correct column order and fill missing columns with 0
for col in self.feature_columns:
if col not in df.columns:
df[col] = 0.0
df = df[self.feature_columns]
# Validate feature count
if df.shape[1] != self.n_features:
raise ValueError(f"Expected {self.n_features} features, got {df.shape[1]}")
return df
def get_class_info(self) -> Dict:
"""
Get information about available classes.
Returns:
Dictionary with class information
"""
return {
'classes': self.classes,
'n_classes': len(self.classes),
'feature_columns': self.feature_columns,
'n_features': self.n_features
}
def batch_predict(self, features_list: List[Union[pd.DataFrame, np.ndarray, List, Dict, str]],
threshold: float = 0.5,
is_json: bool = False,
job_ids: Optional[List[str]] = None) -> List[Dict]:
"""
Perform batch prediction on multiple samples.
Args:
features_list: List of feature inputs (or JSON strings if is_json=True)
threshold: Probability threshold
is_json: If True, treat each item in features_list as JSON string
job_ids: Optional list of job IDs (used when is_json=True)
Returns:
List of prediction results
"""
results = []
for idx, features in enumerate(features_list):
try:
job_id = job_ids[idx] if job_ids and idx < len(job_ids) else None
result = self.predict(features, threshold=threshold, is_json=is_json, job_id=job_id)
results.append(result)
except Exception as e:
results.append({'error': str(e)})
return results
def create_sample_data(n_samples: int = 5) -> List[Dict]:
"""
Create sample feature data for testing.
Args:
n_samples: Number of sample feature vectors to create
Returns:
List of feature dictionaries
"""
np.random.seed(42)
# Load feature columns from model if available
try:
model_data = joblib.load('xgb_model.joblib')
feature_columns = model_data['feature_columns']
except:
# Fallback to some default features
feature_columns = [
'node_num', 'bandwidth_raw_p10', 'bandwidth_raw_median',
'bandwidth_raw_p90', 'bandwidth_raw_mad', 'bandwidth_raw_range',
'bandwidth_raw_iqr', 'flops_raw_p10', 'flops_raw_median',
'flops_raw_p90', 'flops_raw_mad', 'flops_raw_range'
]
samples = []
for _ in range(n_samples):
sample = {}
for col in feature_columns:
if 'bandwidth' in col:
sample[col] = np.random.uniform(50, 500)
elif 'flops' in col:
sample[col] = np.random.uniform(100, 5000)
elif 'node_num' in col:
sample[col] = np.random.randint(1, 16)
else:
sample[col] = np.random.uniform(0, 1000)
samples.append(sample)
return samples
if __name__ == "__main__":
print("XGBoost Multi-Label Inference API")
print("=" * 40)
# Initialize predictor
try:
predictor = XGBoostMultiLabelPredictor()
except Exception as e:
print(f"Error loading model: {e}")
exit(1)
# Example usage of df_aggregate with JSON string
print("\n=== Example 0: JSON Aggregation ===")
sample_json = json.dumps([
{
"node_num": 1,
"bandwidth_raw": 150.5,
"flops_raw": 2500.0,
"arith_intensity": 16.6,
"performance_gflops": 1200.0,
"memory_bw_gbs": 450,
"scalar_peak_gflops": 600,
"duration": 3600
},
{
"node_num": 2,
"bandwidth_raw": 155.2,
"flops_raw": 2600.0,
"arith_intensity": 16.8,
"performance_gflops": 1250.0,
"memory_bw_gbs": 450,
"scalar_peak_gflops": 600,
"duration": 3600
}
])
try:
aggregated_features = df_aggregate(sample_json, job_id_full="test_job_123")
print(f"Aggregated features from JSON:")
for key, value in list(aggregated_features.items())[:10]:
print(f" {key}: {value}")
print(f" ... ({len(aggregated_features)} total features)")
# Use aggregated features for prediction
result = predictor.predict(aggregated_features, threshold=0.3)
print(f"\nPredictions from aggregated data: {result['predictions'][:3]}")
except Exception as e:
print(f"Error in aggregation: {e}")
# Create sample data
print("\n=== Generating sample data for other examples ===")
sample_data = create_sample_data(3)
# Example 1: Single prediction
print("\n=== Example 1: Single Prediction ===")
result = predictor.predict(sample_data[0], threshold=0.3)
print(f"Predictions: {result['predictions']}")
print(f"Confidences: {result['confidences']}")
print(f"Top probabilities:")
for class_name, prob in sorted(result['probabilities'].items(),
key=lambda x: x[1], reverse=True)[:5]:
print(".4f")
# Example 2: Top-K predictions
print("\n=== Example 2: Top-5 Predictions ===")
top_result = predictor.predict_top_k(sample_data[1], k=5)
for i, class_name in enumerate(top_result['top_predictions'], 1):
prob = top_result['top_probabilities'][class_name]
print(f"{i}. {class_name}: {prob:.4f}")
# Example 3: Batch prediction
print("\n=== Example 3: Batch Prediction ===")
batch_results = predictor.batch_predict(sample_data, threshold=0.4)
for i, result in enumerate(batch_results, 1):
if 'error' not in result:
print(f"Sample {i}: {len(result['predictions'])} predictions")
else:
print(f"Sample {i}: Error - {result['error']}")
print("\nAPI ready for use!")
print("Usage:")
print(" predictor = XGBoostMultiLabelPredictor()")
print(" result = predictor.predict(your_features)")
print(" top_k = predictor.predict_top_k(your_features, k=5)")