Recommendation System A/B Testing Implementation - Statistical Rigor & Guardrail Design¶
This is a follow-up to the morning article
Base article: Sora App Recommendation Algorithm Implementation
Target Audience: Intermediate to advanced engineers with recommendation system implementation experience
Goals¶
- Master sample size calculation and statistical testing implementation
- Understand risk management framework through guardrail metrics
- Learn practical patterns for bucket allocation and traffic management
Architecture Overview¶
A/B testing for recommendation systems requires more than simple random assignment. You must balance statistical power, multi-metric monitoring, and user experience protection.
graph TD
A[Experiment Design] --> B[Sample Size Calculation]
B --> C[Bucket Allocation]
C --> D[Traffic Distribution]
D --> E[Guardrail Monitoring]
E --> F{Anomaly Detection}
F -->|Normal| G[Continue Data Collection]
F -->|Anomaly| H[Auto-Stop]
G --> I[Statistical Testing]
I --> J[Decision Making]Implementation Steps¶
Step 1: Sample Size Calculation¶
Power Analysis Implementation:
import numpy as np
from scipy import stats
def calculate_sample_size(
baseline_rate: float,
mde: float, # Minimum Detectable Effect
alpha: float = 0.05,
power: float = 0.80
) -> int:
"""
Sample size calculation for binomial metrics
Args:
baseline_rate: Baseline conversion rate (e.g., 0.05 = 5%)
mde: Minimum detectable effect (e.g., 0.01 = 1pp)
alpha: Type I error rate (significance level)
power: Statistical power (1 - Type II error rate)
Returns:
Required sample size per group
"""
# Calculate Z-scores
z_alpha = stats.norm.ppf(1 - alpha / 2)
z_beta = stats.norm.ppf(power)
# Expected values
p1 = baseline_rate
p2 = baseline_rate + mde
p_pooled = (p1 + p2) / 2
# Sample size formula
numerator = (z_alpha * np.sqrt(2 * p_pooled * (1 - p_pooled)) +
z_beta * np.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2
denominator = (p2 - p1) ** 2
n = int(np.ceil(numerator / denominator))
return n
# Example execution
n_per_group = calculate_sample_size(
baseline_rate=0.05, # Current creation rate 5%
mde=0.01, # Want to detect 1pp improvement
alpha=0.05,
power=0.80
)
print(f"Users needed per group: {n_per_group:,}")
# Output example: Users needed per group: 6,194
Step 2: Bucket Management System¶
Hash-Based Stable Bucketing:
import hashlib
from typing import Literal
class BucketManager:
"""
User ID-based stable bucket assignment
"""
def __init__(self, experiment_id: str, num_buckets: int = 100):
self.experiment_id = experiment_id
self.num_buckets = num_buckets
def assign_bucket(self, user_id: str) -> int:
"""
Assign user to bucket (deterministic)
"""
# Combine user ID and experiment ID, then hash
hash_input = f"{self.experiment_id}:{user_id}"
hash_value = hashlib.md5(hash_input.encode()).hexdigest()
# Convert to bucket number 0-99
bucket = int(hash_value, 16) % self.num_buckets
return bucket
def get_variant(
self,
user_id: str,
control_pct: float = 50.0,
treatment_pct: float = 50.0
) -> Literal['control', 'treatment', 'holdout']:
"""
Determine experiment group from bucket number
Args:
control_pct: Control group percentage (0-100)
treatment_pct: Treatment group percentage (0-100)
Remainder becomes holdout (excluded from analysis)
"""
bucket = self.assign_bucket(user_id)
if bucket < control_pct:
return 'control'
elif bucket < control_pct + treatment_pct:
return 'treatment'
else:
return 'holdout'
# Usage example
manager = BucketManager(experiment_id="inspiration_weight_v1")
variant = manager.get_variant(
user_id="user_12345",
control_pct=45.0,
treatment_pct=45.0
)
print(f"User assignment: {variant}")
Step 3: Guardrail Monitoring System¶
Real-time Anomaly Detection:
from dataclasses import dataclass
from typing import Dict, List
import pandas as pd
@dataclass
class GuardrailThreshold:
"""Guardrail threshold definition"""
metric_name: str
min_value: float = None
max_value: float = None
relative_change: float = None # Relative change from baseline
class GuardrailMonitor:
"""
Guardrail metric monitoring and auto-stop
"""
def __init__(self, thresholds: List[GuardrailThreshold]):
self.thresholds = {t.metric_name: t for t in thresholds}
self.violations = []
def check_metrics(
self,
treatment_metrics: Dict[str, float],
control_metrics: Dict[str, float]
) -> bool:
"""
Check for guardrail violations
Returns:
True: OK to continue, False: Must stop
"""
self.violations = []
for metric_name, threshold in self.thresholds.items():
treatment_value = treatment_metrics.get(metric_name)
control_value = control_metrics.get(metric_name)
if treatment_value is None:
continue
# Absolute value checks
if threshold.min_value and treatment_value < threshold.min_value:
self.violations.append(
f"{metric_name}: {treatment_value:.4f} < min {threshold.min_value}"
)
if threshold.max_value and treatment_value > threshold.max_value:
self.violations.append(
f"{metric_name}: {treatment_value:.4f} > max {threshold.max_value}"
)
# Relative change check
if threshold.relative_change and control_value:
pct_change = (treatment_value - control_value) / control_value
if abs(pct_change) > threshold.relative_change:
self.violations.append(
f"{metric_name}: {pct_change:.2%} change > threshold {threshold.relative_change:.2%}"
)
return len(self.violations) == 0
# Configuration example
guardrails = [
GuardrailThreshold(
metric_name='avg_session_duration',
min_value=300, # Minimum 5 minutes
relative_change=0.30 # Within ±30%
),
GuardrailThreshold(
metric_name='bounce_rate',
max_value=0.70, # Maximum 70%
relative_change=0.20
),
GuardrailThreshold(
metric_name='crash_rate',
max_value=0.01 # Maximum 1%
)
]
monitor = GuardrailMonitor(guardrails)
Benchmark: Statistical Test Comparison¶
| Test Method | Computation Speed | Small Sample Accuracy | Multiple Comparison Support | Recommended Use |
|---|---|---|---|---|
| t-test | ⭐⭐⭐ | ⭐⭐ | ❌ | Single metric, normal distribution |
| Mann-Whitney U | ⭐⭐ | ⭐⭐⭐ | ❌ | Non-normal distribution |
| Bootstrap | ⭐ | ⭐⭐⭐ | ✅ | Complex metrics |
| Sequential | ⭐⭐⭐ | ⭐⭐ | ✅ | Early stopping needed |
Failure Patterns and Mitigation¶
| Symptom | Cause | Mitigation |
|---|---|---|
| p-value oscillates 0.049↔0.051 | Insufficient sample size | Adhere to pre-calculation, no early decisions |
| Frequent guardrail false positives | Thresholds too strict | Calibrate thresholds with historical data |
| Bucket imbalance | Hash function bias | Use MD5/SHA256, validate distribution |
| New users always in treatment | Fixed experiment ID | Change ID per experiment |
Statistical Testing Implementation¶
Sequential Testing:
from typing import Tuple
class SequentialTest:
"""
Sequential testing using Always Valid Inference (AVI)
"""
def __init__(self, alpha: float = 0.05):
self.alpha = alpha
# Simple Robbins-Siegmund boundary implementation
self.boundary_constant = np.sqrt(-2 * np.log(alpha))
def test(
self,
control_conversions: int,
control_total: int,
treatment_conversions: int,
treatment_total: int
) -> Tuple[bool, float, str]:
"""
Execute sequential A/B test
Returns:
(significant, p-value estimate, decision: 'continue'/'stop_treatment_wins'/'stop_no_effect')
"""
# Estimate difference in proportions
p_control = control_conversions / control_total
p_treatment = treatment_conversions / treatment_total
diff = p_treatment - p_control
# Standard error
se = np.sqrt(
p_control * (1 - p_control) / control_total +
p_treatment * (1 - p_treatment) / treatment_total
)
# Z-statistic
z_score = diff / se if se > 0 else 0
# Sequential boundary (sample size dependent)
n_total = control_total + treatment_total
boundary = self.boundary_constant / np.sqrt(n_total)
# Decision
if abs(z_score) > boundary:
if z_score > 0:
return True, self._estimate_p_value(z_score), 'stop_treatment_wins'
else:
return True, self._estimate_p_value(z_score), 'stop_control_wins'
else:
return False, None, 'continue'
def _estimate_p_value(self, z_score: float) -> float:
"""Estimate p-value from z-statistic"""
return 2 * (1 - stats.norm.cdf(abs(z_score)))
Automation & Extension Patterns¶
- Multi-Armed Bandit Integration: Dynamic traffic allocation via Thompson Sampling
- Hierarchical Bayesian Model: Simultaneous estimation of user segment effects
- Causal Inference Framework: Confounder adjustment and ATE estimation
- Meta-Analysis Pipeline: Prior distribution construction from past experiments
- Auto-Report Generation: Statistical test result visualization and Slack notifications
Next Steps¶
- Sora App Recommendation Algorithm Implementation - Basic recommendation system design
Implementation Notes:
- Sequential Testing enables early stopping but traditional p-value interpretation doesn't apply
- Recommend setting guardrail thresholds from 95th percentile of historical data
- Bucket allocation should be user-based; session-based risks confounding
References:
- Johari et al. (2017): "Peeking at A/B Tests"
- Netflix Tech Blog: "Building Confidence in A/B Testing"
- Optimizely Stats Engine: Sequential Testing whitepaper