"""V2 stress scenario catalog for BCF development.
Public API
----------
Template-based API:
- ``TEMPLATES``: ``dict[str, Callable]`` — registered scenario templates.
Keys: ``"constant"``, ``"reversal"``, ``"sparse_benefit"``,
``"nonlinear"``, ``"monotone_gradient"``, ``"interaction_only"``,
``"clustered"``. Each template callable has signature
``(metric_id, effect_scale, ...) -> (V2GeneratorConfig, dict)``.
- ``build_catalog(...)`` returns ``list[ScenarioSpec]`` — a star-sweep
catalog of ``(scenario_fn, kwargs)`` tuples for ``run_suite_parallel``.
- ``PROFILES``: ``dict[str, dict]`` — preset catalog configurations.
Keys: ``"validate"``, ``"standard"``, ``"thorough"``.
Design
------
- Template callables accept ``seed`` and other knobs as kwargs so that
``(template_fn, kwargs)`` pairs are fully reproducible.
- Metadata dicts carry at minimum ``"scenario_class"`` plus all
control-knob keys so downstream benchmarking loops can group, filter,
and compare runs.
- Configs are ready to pass directly to ``generate_v2_core()``.
- No imports from v1 generator internals.
"""
from __future__ import annotations
import dataclasses
from collections.abc import Callable
from typing import Any, cast
import numpy as np
import pandas as pd
from scipy.special import expit, logit
from pytyche.contracts import CartRevenueConfig, ProductCategory
from pytyche.generators.core import (
AssignmentConfig,
CopulaConfig,
FeatureSpec,
MetricMode,
MixtureConfig,
SurfaceConfig,
V2GeneratorConfig,
sigmoid_surface,
threshold_surface,
)
# Re-export for backward compatibility — callers that import these from
# generate_scenarios continue to work after the move to contracts.py.
__all__ = [
"CartRevenueConfig",
"ProductCategory",
]
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _identity_corr(n: int) -> np.ndarray:
"""Return an n×n identity correlation matrix (no inter-feature correlation)."""
return np.eye(n)
def _relative_lift_to_logit_shift(conversion_delta: float, baseline_rate: float) -> float:
"""Convert relative lift to logit-space shift.
``conversion_delta=0.15`` means "+15% relative lift."
At ``baseline_rate=0.03``: ``p1 = 0.03 × 1.15 = 0.0345``.
The returned shift satisfies ``expit(logit(baseline_rate) + shift) == p1``.
Parameters
----------
conversion_delta:
Relative lift (e.g. +0.15 = +15%, -0.20 = -20%).
baseline_rate:
Baseline conversion probability at the population centroid.
Returns
-------
float
Logit-space shift to apply to the baseline logit.
"""
p0 = baseline_rate
p1 = p0 * (1.0 + conversion_delta)
# Clamp p1 to a valid probability range to avoid logit(0) or logit(1).
p1 = max(min(p1, 0.9999), 0.0001)
return float(logit(p1) - logit(p0))
def _severity_multiplicative_factor(severity_delta: float) -> float:
"""Convert a relative severity lift to a multiplicative factor.
``severity_delta=0.10`` means "+10% AOV lift": ``m1 = m0 × 1.10``.
The returned factor is ``1 + severity_delta``.
Parameters
----------
severity_delta:
Relative lift (e.g. +0.10 = +10%, -0.15 = -15%).
Returns
-------
float
Multiplicative factor for the severity mean.
Raises
------
ValueError
If ``severity_delta <= -1.0`` (would produce a non-positive factor).
"""
if severity_delta <= -1.0:
raise ValueError(
f"severity_delta must be > -1.0 to keep factor positive, got {severity_delta}"
)
return 1.0 + severity_delta
# ---------------------------------------------------------------------------
# Default mixed-type feature schema (Task 3.1)
# ---------------------------------------------------------------------------
#: Default core features for non-clustered templates.
#: Semantic e-commerce names, mixed types: 2 continuous + 2 categorical + 1 binary.
_DEFAULT_CORE_FEATURES: tuple[FeatureSpec, ...] = (
FeatureSpec(name="session_recency", kind="continuous"),
FeatureSpec(name="browse_depth", kind="continuous"),
FeatureSpec(name="device_type", kind="categorical"),
FeatureSpec(name="is_returning", kind="binary"),
FeatureSpec(name="channel", kind="categorical"),
)
def _build_default_mixture_sampler(n_nuisance: int = 5) -> MixtureConfig:
"""Build the default single-cluster MixtureConfig with realistic e-commerce marginals.
Schema:
session_recency — continuous, mean=2.0 std=1.5 (right-skewed recency proxy)
browse_depth — continuous, mean=3.0 std=2.0 (pages viewed per session)
device_type — categorical, mobile 55% / desktop 35% / tablet 10%
is_returning — binary, 35% True (65% new visitors)
channel — categorical, organic 40% / paid 30% / social 20% / email 10%
z0..z{n-1} — continuous nuisance features, N(0, 1)
"""
nuisance_features: tuple[FeatureSpec, ...] = tuple(
FeatureSpec(name=f"z{i}", kind="continuous") for i in range(n_nuisance)
)
all_features = _DEFAULT_CORE_FEATURES + nuisance_features
cluster_params: dict[str, Any] = {
"session_recency_mean": 2.0,
"session_recency_std": 1.5,
"browse_depth_mean": 3.0,
"browse_depth_std": 2.0,
"device_type_probs": {"mobile": 0.55, "desktop": 0.35, "tablet": 0.10},
"is_returning_prob": 0.35,
"channel_probs": {"organic": 0.40, "paid": 0.30, "social": 0.20, "email": 0.10},
}
for i in range(n_nuisance):
cluster_params[f"z{i}_mean"] = 0.0
cluster_params[f"z{i}_std"] = 1.0
return MixtureConfig(
features=all_features,
weights=(1.0,),
cluster_params=(cluster_params,),
)
def _const_surface(value: float) -> SurfaceConfig:
"""Surface that returns the same scalar for every visitor."""
return SurfaceConfig(fn=lambda X, v=value: np.full(len(X), v))
[docs]
@dataclasses.dataclass(frozen=True)
class DriverInfo:
"""Resolved driver feature statistics from a sampler config.
Two kinds: ``"continuous"`` or ``"categorical"``.
Binary features (FeatureSpec kind="binary") map to categorical with
``levels=(True, False)`` or ``(False, True)`` depending on frequency.
Fields:
name: Feature column name.
kind: ``"continuous"`` or ``"categorical"`` — no separate binary kind.
mean: Population-weighted mean (continuous only, 0.0 for categorical).
std: Population marginal std (continuous only, 1.0 for categorical).
levels: Actual level values sorted by ``(-prob, str(level))`` for
deterministic tie-breaking. ``None`` for continuous.
"""
name: str
kind: str # "continuous" or "categorical"
mean: float = 0.0
std: float = 1.0
levels: tuple[Any, ...] | None = None
def __post_init__(self) -> None:
if self.kind not in ("continuous", "categorical"):
raise ValueError(
f"DriverInfo: kind must be 'continuous' or 'categorical', got {self.kind!r}"
)
if self.kind == "categorical" and self.levels is None:
raise ValueError("DriverInfo: categorical driver must have levels")
if self.kind == "continuous" and self.levels is not None:
raise ValueError("DriverInfo: continuous driver must not have levels")
_VALID_SIGNAL_TYPES: frozenset[str] = frozenset({"linear", "interaction", "quadratic", "step"})
[docs]
@dataclasses.dataclass(frozen=True)
class SignalTerm:
"""One additive term in a within-cluster CATE surface.
Fields:
signal_type: One of ``"linear"``, ``"interaction"``, ``"quadratic"``, ``"step"``.
drivers: Feature name(s) for this term. Count is type-dependent.
amplitude: Signed weight for this term.
level_values: Per-level signal values for ``"step"`` type; ``None`` for others.
"""
signal_type: str
drivers: tuple[str, ...]
amplitude: float
level_values: dict[str, float] | None = None
def __post_init__(self) -> None:
if self.signal_type not in _VALID_SIGNAL_TYPES:
raise ValueError(
f"SignalTerm: signal_type must be one of {sorted(_VALID_SIGNAL_TYPES)}, "
f"got {self.signal_type!r}"
)
if self.signal_type == "linear":
if len(self.drivers) != 1:
raise ValueError(
f"SignalTerm: 'linear' requires exactly 1 driver, "
f"got {len(self.drivers)}"
)
elif self.signal_type == "interaction":
if len(self.drivers) != 2:
raise ValueError(
f"SignalTerm: 'interaction' requires exactly 2 drivers, "
f"got {len(self.drivers)}"
)
elif self.signal_type == "quadratic":
if len(self.drivers) != 1:
raise ValueError(
f"SignalTerm: 'quadratic' requires exactly 1 driver, "
f"got {len(self.drivers)}"
)
elif self.signal_type == "step":
if len(self.drivers) != 1:
raise ValueError(
f"SignalTerm: 'step' requires exactly 1 driver, "
f"got {len(self.drivers)}"
)
if not self.level_values:
raise ValueError(
"SignalTerm: 'step' requires level_values to be provided and non-empty"
)
[docs]
@dataclasses.dataclass(frozen=True)
class ClusterEffectProfile:
"""Treatment effect + prognostic specification for one cluster.
The latent cluster type drives both prognostic outcomes (p0, m0) and treatment
response (p1-p0, m1-m0). Observed features are noisy windows into the latent
type — BCF only sees features, not cluster_id.
Fields:
conversion_delta: Relative lift for conversion probability (e.g. +0.15 = +15%).
severity_delta: Relative lift for severity mean (e.g. -0.10 = -10%).
conversion_signals: Additive within-cluster signal terms for conversion HTE.
severity_signals: Additive within-cluster signal terms for severity HTE.
prognostic_conv_base: Logit-space shift from global baseline_rate for this
cluster's p0. Centered across clusters so weighted mean = 0.
prognostic_sev_base: Log-space shift from global m0_base for this cluster's
m0. Centered across clusters so weighted mean = 0.
prognostic_conv_signals: Within-cluster feature-driven p0 gradients.
prognostic_sev_signals: Within-cluster feature-driven m0 gradients.
"""
conversion_delta: float = 0.0
severity_delta: float = 0.0
conversion_signals: tuple[SignalTerm, ...] = ()
severity_signals: tuple[SignalTerm, ...] = ()
prognostic_conv_base: float = 0.0
prognostic_sev_base: float = 0.0
prognostic_conv_signals: tuple[SignalTerm, ...] = ()
prognostic_sev_signals: tuple[SignalTerm, ...] = ()
def _resolve_driver(
driver: str,
sampler: CopulaConfig | MixtureConfig,
) -> DriverInfo:
"""Resolve driver feature statistics from a sampler config.
Extracts population-level statistics (mean, std, or level probabilities)
from the sampler so templates can center surfaces and select target levels
without hardcoding feature-specific values.
Parameters
----------
driver:
Feature name to resolve.
sampler:
Active sampler config (``MixtureConfig`` or ``CopulaConfig``).
Returns
-------
DriverInfo
Resolved statistics for the driver feature.
Raises
------
ValueError
If the driver is not found in the sampler's feature list.
"""
# Find feature spec in sampler.
feat_spec: FeatureSpec | None = None
for spec in sampler.features:
if spec.name == driver:
feat_spec = spec
break
if feat_spec is None:
raise ValueError(
f"Driver feature {driver!r} not found in sampler features. "
f"Known features: {[s.name for s in sampler.features]}"
)
if isinstance(sampler, MixtureConfig):
return _resolve_driver_mixture(driver, feat_spec, sampler)
else:
return _resolve_driver_copula(driver, feat_spec)
def _resolve_driver_mixture(
driver: str,
feat_spec: FeatureSpec,
sampler: MixtureConfig,
) -> DriverInfo:
"""Resolve driver from MixtureConfig using weighted population stats."""
total_weight = sum(sampler.weights)
w_norm = [w / total_weight for w in sampler.weights]
if feat_spec.kind == "continuous":
pop_mean = sum(
w * params[f"{driver}_mean"]
for w, params in zip(w_norm, sampler.cluster_params, strict=False)
)
pop_var = (
sum(
w * (params[f"{driver}_std"] ** 2 + params[f"{driver}_mean"] ** 2)
for w, params in zip(w_norm, sampler.cluster_params, strict=False)
)
- pop_mean**2
)
pop_std = float(np.sqrt(max(pop_var, 0.0)))
return DriverInfo(name=driver, kind="continuous", mean=pop_mean, std=pop_std)
elif feat_spec.kind == "binary":
# Binary → categorical with bool levels.
prob_true = sum(
w * params[f"{driver}_prob"]
for w, params in zip(w_norm, sampler.cluster_params, strict=False)
)
prob_false = 1.0 - prob_true
# Sort by (-prob, str(level)) for deterministic ordering.
level_probs = [(True, prob_true), (False, prob_false)]
sorted_levels = sorted(level_probs, key=lambda lp: (-lp[1], str(lp[0])))
levels = tuple(lp[0] for lp in sorted_levels)
return DriverInfo(name=driver, kind="categorical", levels=levels)
else:
# Categorical: aggregate weighted probabilities across clusters.
combined_probs: dict[str, float] = {}
for w, params in zip(w_norm, sampler.cluster_params, strict=False):
probs_dict = params.get(f"{driver}_probs", {})
for level, p in probs_dict.items():
combined_probs[level] = combined_probs.get(level, 0.0) + w * p
sorted_levels_cat = sorted(
combined_probs.keys(), key=lambda lvl: (-combined_probs[lvl], str(lvl))
)
levels = tuple(sorted_levels_cat)
return DriverInfo(name=driver, kind="categorical", levels=levels)
def _resolve_driver_copula(
driver: str,
feat_spec: FeatureSpec,
) -> DriverInfo:
"""Resolve driver from CopulaConfig using known defaults."""
if feat_spec.kind == "continuous":
return DriverInfo(name=driver, kind="continuous", mean=0.0, std=1.0)
elif feat_spec.kind == "binary":
# Default binary: 0.5 each → tie-broken by str: "False" < "True"
return DriverInfo(name=driver, kind="categorical", levels=(False, True))
else:
raise ValueError(
f"CopulaConfig does not provide category level information for "
f"categorical feature {driver!r}. Use MixtureConfig for "
f"categorical driver resolution."
)
# ---------------------------------------------------------------------------
# Shared builder (Task 3.1)
# ---------------------------------------------------------------------------
def _build_scenario(
*,
template_name: str,
p0_surface: SurfaceConfig,
p1_surface_fn: Callable[[float], SurfaceConfig],
metric_id: str,
effect_scale: float,
severity_sigma: float = 0.5,
n_visitors: int = 1000,
n_nuisance: int = 5,
treatment_allocation: float = 0.5,
seed: int = 42,
baseline_rate: float = 0.05,
sampler: CopulaConfig | MixtureConfig | None = None,
revenue_model: str | CartRevenueConfig = "lognormal",
m0_surface: SurfaceConfig | None = None,
m1_surface_fn: Callable[[float], SurfaceConfig] | None = None,
channel_mode: str | None = None,
prognostic_features: list[str] | None = None,
prescriptive_features: list[str] | None = None,
driver: str | None = None,
severity_driver: str = "browse_depth",
) -> tuple[V2GeneratorConfig, dict]:
"""Shared wiring for all template-based scenario construction.
Handles feature list construction, sampler setup, MetricMode assembly,
hurdle severity mirroring, config assembly, and metadata emission.
Parameters
----------
template_name:
Used in metadata as ``scenario_class`` and ``template``.
p0_surface:
Baseline conversion surface (control arm probability surface).
p1_surface_fn:
Callable of ``effect_scale`` returning the treatment conversion surface.
Must produce p1 == p0 when effect_scale == 0.
metric_id:
``"conversion_rate"`` or ``"revenue_per_visitor"``.
effect_scale:
Controls CATE magnitude. 0 = A/A (p1 = p0).
severity_sigma:
LogNormal sigma for hurdle severity surfaces.
n_visitors:
Total visitor count.
n_nuisance:
Number of additional uncorrelated nuisance features (z0..z{n-1}).
treatment_allocation:
Fraction assigned to treatment.
seed:
Random seed.
baseline_rate:
Baseline conversion probability. Sets logit-additive intercept for
new shapes; for existing shapes with their own baseline structure,
this is recorded in metadata but shapes may use their own constants.
sampler:
Override the default MixtureConfig sampler. When ``MixtureConfig``
is provided, ``cluster_id`` is available to surfaces during generation
but stripped from output by ``generate_v2_core`` (latent, not observed).
revenue_model:
Revenue model type. ``"lognormal"`` (default) or a CartRevenueConfig.
m1_surface_fn:
Optional callable of ``effect_scale`` returning the treatment severity
mean surface for hurdle metrics. When provided, overrides the default
constant m1 surface (``m0_base + effect_scale * 5``). When ``None``,
falls back to the constant severity offset. Used by templates that want
severity surfaces to mirror the conversion spatial pattern.
channel_mode:
Controls severity surface construction for hurdle metrics. Binary
metrics ignore this parameter.
- ``None`` / ``"mirror"`` (default): existing behavior — use
``m1_surface_fn`` if provided, else constant offset.
- ``"opposing"``: negate the severity delta. If ``m1_surface_fn`` is
provided, wrap its output to flip the sign of the delta around
``m0_base``; otherwise ``m1 = m0_base - effect_scale * 5``.
- ``"conversion_only"``: ``m0 = m1`` (identical severity surfaces,
zero AOV component).
- ``"severity_only"``: ``p0 = p1`` (identical conversion surfaces,
zero conversion component). Severity varies per the normal
``m1_surface_fn`` / constant offset logic.
- ``"orthogonal"``: conversion uses the template's driver, severity
uses ``severity_driver`` (independent features). The template's
``p1_surface_fn`` is preserved; the severity surface becomes
``m1 = m0_base + effect_scale * severity_slope * (severity_driver - mean)``.
prognostic_features:
Optional list of feature names that drive only the baseline surfaces
(``p0``, ``m0``). When specified, baseline surfaces receive only
these columns from the feature matrix X. When ``None``, all features
are available to baseline surfaces (backward compatible).
prescriptive_features:
Optional list of feature names that drive only the treatment delta
surfaces (``p1 - p0``, ``m1 - m0``). When specified, treatment
surfaces receive only these columns. When ``None``, all features are
available (backward compatible).
driver:
Name of the primary feature driving CATE heterogeneity. When provided,
recorded in metadata as ``"driver"``. When ``None``, the key is absent
from metadata.
severity_driver:
Feature name for the orthogonal severity channel (default
``"browse_depth"``). Only used when ``channel_mode="orthogonal"``.
Returns
-------
tuple[V2GeneratorConfig, dict]
Config ready for ``generate_v2_core()`` and machine-readable metadata.
"""
# 1. Sampler setup: use provided sampler or build default MixtureConfig.
# Default is a single-cluster MixtureConfig with semantic mixed-type e-commerce features.
# CopulaConfig is still available via the sampler override.
if sampler is None:
feature_sampler: CopulaConfig | MixtureConfig = _build_default_mixture_sampler(n_nuisance)
else:
feature_sampler = sampler
# 2b. Feature separation: wrap surfaces to zero out non-designated columns in X.
# When prognostic_features is specified, p0 (baseline) receives X with all other
# continuous feature columns zeroed — removing their variation.
# When prescriptive_features is specified, p1 surface receives X with all other
# continuous feature columns zeroed.
# When neither is specified, all features are shared (backward compatible).
# Zeroing (not projection) is used so surface functions that read any column
# still work — they just receive 0.0 for non-designated features.
def _zero_non_designated(X: pd.DataFrame, keep_cols: list[str]) -> pd.DataFrame:
"""Return a copy of X with continuous (float) feature columns NOT in keep_cols set to 0."""
X_masked = X.copy()
for col in X_masked.columns:
if col not in keep_cols and X_masked[col].dtype == float:
X_masked[col] = 0.0
return X_masked
if prognostic_features is not None:
_prog_cols = list(prognostic_features)
_orig_p0_fn = p0_surface.fn
p0_surface = SurfaceConfig(
fn=lambda X, _fn=_orig_p0_fn, _cols=_prog_cols: _fn(_zero_non_designated(X, _cols))
)
if prescriptive_features is not None:
_presc_cols = list(prescriptive_features)
_orig_p1_fn_builder = p1_surface_fn
def _wrapped_p1_surface_fn(es: float, _builder=_orig_p1_fn_builder, _cols=_presc_cols) -> SurfaceConfig:
_inner = _builder(es)
return SurfaceConfig(fn=lambda X, _fn=_inner.fn, _c=_cols: _fn(_zero_non_designated(X, _c)))
p1_surface_fn = _wrapped_p1_surface_fn
# Also wrap m1_surface_fn (severity delta) when provided, so hurdle
# scenarios respect prescriptive feature separation on both channels.
if m1_surface_fn is not None:
_orig_m1_fn_builder = m1_surface_fn
def _wrapped_m1_surface_fn(es: float, _builder=_orig_m1_fn_builder, _cols=_presc_cols) -> SurfaceConfig:
_inner = _builder(es)
return SurfaceConfig(fn=lambda X, _fn=_inner.fn, _c=_cols: _fn(_zero_non_designated(X, _c)))
m1_surface_fn = _wrapped_m1_surface_fn
# 3. Reject incompatible knobs early.
if channel_mode is not None and metric_id == "conversion_rate":
raise ValueError(
f"channel_mode={channel_mode!r} is not supported for binary metric "
f"{metric_id!r}; channel_mode applies only to hurdle metrics"
)
# 4. Build treatment surface from the callable
p1_surface = p1_surface_fn(effect_scale)
# 5. MetricMode assembly
if metric_id == "conversion_rate":
metric_mode = MetricMode(
metric_id="conversion_rate",
p0_surface=p0_surface,
p1_surface=p1_surface,
)
elif metric_id == "revenue_per_visitor":
# Severity surfaces: constant m0 at 50.0.
# channel_mode controls how m0/m1/p0/p1 are assembled.
_m0_base = 50.0
_severity_slope = 5.0
sigma0_surface = _const_surface(severity_sigma)
sigma1_surface = _const_surface(severity_sigma)
_resolved_mode = channel_mode if channel_mode is not None else "mirror"
if _resolved_mode == "mirror":
# Default: use m1_surface_fn if provided, else constant offset.
if m0_surface is None:
m0_surface = _const_surface(_m0_base)
if m1_surface_fn is not None:
m1_surface = m1_surface_fn(effect_scale)
else:
m1_surface = _const_surface(_m0_base + effect_scale * _severity_slope)
effective_p0 = p0_surface
effective_p1 = p1_surface
elif _resolved_mode == "opposing":
# Negate severity delta: m1 reflects opposite direction to m1_surface_fn.
m0_surface = _const_surface(_m0_base)
if m1_surface_fn is not None:
# Wrap m1_surface_fn output to negate delta around m0_base:
# m1_negated = m0_base - (m1_surface_fn(es)(X) - m0_base)
# = 2 * m0_base - m1_surface_fn(es)(X)
_raw_m1 = m1_surface_fn(effect_scale)
m1_surface = SurfaceConfig(
fn=lambda X, _m=_raw_m1, _b=_m0_base: 2.0 * _b - _m.fn(X)
)
else:
m1_surface = _const_surface(_m0_base - effect_scale * _severity_slope)
effective_p0 = p0_surface
effective_p1 = p1_surface
elif _resolved_mode == "conversion_only":
# m0 == m1 (identical severity surfaces — zero AOV component).
m0_surface = _const_surface(_m0_base)
m1_surface = _const_surface(_m0_base)
effective_p0 = p0_surface
effective_p1 = p1_surface
elif _resolved_mode == "severity_only":
# p0 == p1 (identical conversion surfaces — zero conversion component).
# Severity varies per the normal m1_surface_fn / constant offset logic.
m0_surface = _const_surface(_m0_base)
if m1_surface_fn is not None:
m1_surface = m1_surface_fn(effect_scale)
else:
m1_surface = _const_surface(_m0_base + effect_scale * _severity_slope)
# Override p1 to equal p0 (no conversion effect).
effective_p0 = p0_surface
effective_p1 = p0_surface
elif _resolved_mode == "orthogonal":
# Conversion uses the template's driver (via p1_surface_fn); severity uses
# severity_driver (independent feature). Center around population mean so
# the severity CATE flips sign around 0.
_sev_info = _resolve_driver(severity_driver, feature_sampler)
_sev_mu = _sev_info.mean
m0_surface = _const_surface(_m0_base)
m1_surface = SurfaceConfig(
fn=lambda X, _es=effect_scale, _sl=_severity_slope, _b=_m0_base, _mu=_sev_mu, _sd=severity_driver: (
_b + _es * _sl * (X[_sd].values.astype(float) - _mu)
)
)
effective_p0 = p0_surface
effective_p1 = p1_surface
else:
raise ValueError(
f"_build_scenario: unsupported channel_mode {_resolved_mode!r}. "
"Must be one of 'mirror', 'opposing', 'conversion_only', "
"'severity_only', 'orthogonal', or None."
)
metric_mode = MetricMode(
metric_id="revenue_per_visitor",
p0_surface=effective_p0,
p1_surface=effective_p1,
m0_surface=m0_surface,
m1_surface=m1_surface,
sigma0_surface=sigma0_surface,
sigma1_surface=sigma1_surface,
)
else:
raise ValueError(
f"_build_scenario: unsupported metric_id {metric_id!r}. "
"Must be 'conversion_rate' or 'revenue_per_visitor'."
)
# 5. Config assembly
# Pass CartRevenueConfig to V2GeneratorConfig.revenue_model so generate_core
# can dispatch to the cart sampler instead of lognormal.
cart_config: CartRevenueConfig | None = (
revenue_model if isinstance(revenue_model, CartRevenueConfig) else None
)
config = V2GeneratorConfig(
n_visitors=n_visitors,
feature_sampler=feature_sampler,
metric_mode=metric_mode,
assignment=AssignmentConfig(treatment_allocation=treatment_allocation),
seed=seed,
experiment_id=f"{template_name}-es{effect_scale}-seed{seed}",
revenue_model=cart_config,
)
# 6. Metadata
metadata: dict[str, str | int | float] = {
"scenario_class": template_name,
"template": template_name,
"metric_id": metric_id,
"effect_scale": effect_scale,
"n_visitors": n_visitors,
"seed": seed,
"treatment_allocation": treatment_allocation,
"baseline_rate": baseline_rate,
"n_nuisance": n_nuisance,
}
# Record revenue_model in metadata: "lognormal" for default, "cart" for CartRevenueConfig.
if isinstance(revenue_model, CartRevenueConfig):
metadata["revenue_model"] = "cart"
else:
metadata["revenue_model"] = revenue_model
# Record driver in metadata when provided.
if driver is not None:
metadata["driver"] = driver
return config, metadata
# ---------------------------------------------------------------------------
# Shared template helpers
# ---------------------------------------------------------------------------
def _resolve_categorical_target(
driver_info: DriverInfo,
driver_target_level: Any | None,
template_name: str,
) -> tuple[Any, str]:
"""Resolve target level for a categorical driver.
Returns ``(target_value, "auto"|"explicit")``. Auto-selects the most
frequent level when ``driver_target_level`` is ``None``.
"""
assert driver_info.levels is not None, (
f"_resolve_categorical_target called for non-categorical driver "
f"{driver_info.name!r}"
)
if driver_target_level is None:
return driver_info.levels[0], "auto"
if driver_target_level not in driver_info.levels:
raise ValueError(
f"_template_{template_name}: driver_target_level={driver_target_level!r} "
f"not in driver {driver_info.name!r} levels: {driver_info.levels}"
)
return driver_target_level, "explicit"
def _build_signal(
driver_info: DriverInfo,
driver: str,
driver_target_level: Any | None,
template_name: str,
*,
cat_target_val: float,
cat_other_val: float,
) -> tuple[Callable, dict]:
"""Build a zero-centered spatial signal from a driver feature.
Continuous drivers produce ``lambda X: X[driver] - mu``.
Categorical drivers produce a contrast function using ``cat_target_val``
for the target level and ``cat_other_val`` for all others.
Returns ``(signal_fn, extra_metadata_dict)``.
"""
if driver_info.kind == "continuous":
_mu = driver_info.mean
def signal_fn_continuous(X, _d=driver, _m=_mu):
return np.asarray(X[_d], dtype=float) - _m
return signal_fn_continuous, {}
else:
_t, _src = _resolve_categorical_target(driver_info, driver_target_level, template_name)
def signal_fn_categorical(X, _d=driver, _tgt=_t, _tv=cat_target_val, _ov=cat_other_val):
return np.where(np.asarray(X[_d]) == _tgt, _tv, _ov).astype(float)
return signal_fn_categorical, {"driver_target_level": _t, "driver_target_source": _src}
def _evaluate_signal(
term: SignalTerm,
X: pd.DataFrame,
driver_info: dict[str, DriverInfo],
) -> np.ndarray:
"""Evaluate a SignalTerm against feature matrix X using driver_info statistics.
Dispatches on ``term.signal_type``:
- ``"linear"``: ``_build_signal`` encoding (centered continuous or contrast-coded
categorical) multiplied by ``term.amplitude``.
- ``"interaction"``: product of two ``_build_signal`` encodings, multiplied by
``term.amplitude``.
- ``"quadratic"``: squared centered continuous value, multiplied by
``term.amplitude``. Raises ``ValueError`` for categorical drivers.
- ``"step"``: per-level values from ``term.level_values`` mapped onto
``X[driver]``; unknown levels receive 0.0. Multiplied by ``term.amplitude``.
Parameters
----------
term:
SignalTerm describing signal type, drivers, amplitude, and (for step) level_values.
X:
Feature DataFrame with one row per visitor.
driver_info:
Mapping from feature name to resolved DriverInfo statistics.
Returns
-------
np.ndarray
1-D array of shape ``(len(X),)`` with the evaluated signal values.
Raises
------
ValueError
If ``signal_type="quadratic"`` is used with a categorical driver.
"""
if term.signal_type == "linear":
driver = term.drivers[0]
signal_fn, _ = _build_signal(
driver_info[driver], driver, None, "_evaluate_signal",
cat_target_val=0.5, cat_other_val=-0.5,
)
return term.amplitude * signal_fn(X)
elif term.signal_type == "interaction":
driver0 = term.drivers[0]
driver1 = term.drivers[1]
signal_fn0, _ = _build_signal(
driver_info[driver0], driver0, None, "_evaluate_signal",
cat_target_val=0.5, cat_other_val=-0.5,
)
signal_fn1, _ = _build_signal(
driver_info[driver1], driver1, None, "_evaluate_signal",
cat_target_val=0.5, cat_other_val=-0.5,
)
return term.amplitude * (signal_fn0(X) * signal_fn1(X))
elif term.signal_type == "quadratic":
driver = term.drivers[0]
info = driver_info[driver]
if info.kind != "continuous":
raise ValueError(
f"_evaluate_signal: signal_type='quadratic' requires a continuous driver, "
f"but driver={driver!r} has kind={info.kind!r}. "
f"Use a continuous feature for quadratic terms."
)
centered = np.asarray(X[driver], dtype=float) - info.mean
return term.amplitude * (centered ** 2)
elif term.signal_type == "step":
driver = term.drivers[0]
col = np.asarray(X[driver])
values = np.zeros(len(X), dtype=float)
for level, val in (term.level_values or {}).items():
values[col == level] = val
return term.amplitude * values
else:
raise ValueError(
f"_evaluate_signal: unhandled signal_type={term.signal_type!r}"
)
def _logit_additive_surfaces(
intercept: float,
raw_signal_fn: Callable,
*,
conversion_slope: float = 1.0,
severity_slope: float = 5.0,
m0_base: float = 50.0,
) -> tuple[SurfaceConfig, Callable[[float], SurfaceConfig], Callable[[float], SurfaceConfig]]:
"""Build the logit-additive (p0, p1_fn, m1_fn) triple from a raw spatial signal.
Returns:
p0_surface: sigmoid(intercept) — constant baseline.
p1_surface_fn: es → sigmoid(intercept + es × conversion_slope × signal(X))
m1_surface_fn: es → m0_base + es × severity_slope × signal(X)
"""
p0_surface = sigmoid_surface(
SurfaceConfig(fn=lambda X, _ic=intercept: np.full(len(X), _ic))
)
def p1_surface_fn(es: float) -> SurfaceConfig:
return sigmoid_surface(
SurfaceConfig(
fn=lambda X, _es=es, _ic=intercept, _sl=conversion_slope, _sig=raw_signal_fn: (
_ic + _es * _sl * _sig(X)
)
)
)
def m1_surface_fn(es: float) -> SurfaceConfig:
return SurfaceConfig(
fn=lambda X, _es=es, _ssl=severity_slope, _b=m0_base, _sig=raw_signal_fn: (
_b + _es * _ssl * _sig(X)
)
)
return p0_surface, p1_surface_fn, m1_surface_fn
def _require_continuous(
driver_info: DriverInfo,
param_label: str,
template_name: str,
) -> None:
"""Raise ValueError if a driver is not continuous."""
if driver_info.kind != "continuous":
raise ValueError(
f"{template_name} template requires continuous drivers, but "
f"{param_label}={driver_info.name!r} has kind={driver_info.kind!r}. "
f"Use a continuous feature (e.g. 'session_recency', 'browse_depth') for {template_name}."
)
def _make_correlated_nuisance(
n_clusters: int,
n_correlated: int,
*,
spread: float = 1.5,
rng_seed: int = 98765,
) -> tuple[tuple[FeatureSpec, ...], tuple[dict[str, Any], ...]]:
"""Generate cluster-correlated nuisance features.
These features have cluster-varying distributions (different means for
continuous, different probability vectors for categorical) so they
correlate with latent cluster membership. They are NOT added to any
``SignalTerm`` — they exist purely as spurious correlates of the
latent type, creating a realistic "haystack" for the estimator.
Parameters
----------
n_clusters:
Number of clusters in the mixture.
n_correlated:
Total number of correlated nuisance features to generate.
Split ~60% continuous (``xc0``, ``xc1``, ...) and
~40% categorical (``xk0``, ``xk1``, ...).
spread:
Controls how separated cluster means are for continuous features.
Higher = more separable (easier); lower = noisier projection.
rng_seed:
Seed for deterministic generation of per-cluster parameters.
Returns
-------
(feature_specs, per_cluster_param_dicts)
``feature_specs``: tuple of FeatureSpec for the new features.
``per_cluster_param_dicts``: tuple of dicts (one per cluster)
with the sampling parameters to merge into cluster_params.
"""
if n_correlated <= 0:
empty_params = tuple({} for _ in range(n_clusters))
return (), empty_params
rng = np.random.default_rng(rng_seed)
n_cont = round(n_correlated * 0.6)
n_cat = n_correlated - n_cont
feature_specs: list[FeatureSpec] = []
# Initialize one param dict per cluster
per_cluster: list[dict[str, Any]] = [{} for _ in range(n_clusters)]
# --- Continuous features: cluster-specific means, shared std=1.0 ---
for i in range(n_cont):
name = f"xc{i}"
feature_specs.append(FeatureSpec(name=name, kind="continuous"))
# Draw cluster means from N(0, spread) — seeded so deterministic
cluster_means = rng.normal(0.0, spread, size=n_clusters)
for c in range(n_clusters):
per_cluster[c][f"{name}_mean"] = float(cluster_means[c])
per_cluster[c][f"{name}_std"] = 1.0
# --- Categorical features: cluster-specific probability vectors ---
for i in range(n_cat):
name = f"xk{i}"
n_levels = int(rng.integers(3, 6)) # 3–5 levels
levels = [f"{name}_L{j}" for j in range(n_levels)]
feature_specs.append(FeatureSpec(name=name, kind="categorical"))
# Draw cluster-specific probability vectors from Dirichlet
# Concentration < 1 makes distributions peaky (different dominant levels)
alpha = np.full(n_levels, 0.5)
for c in range(n_clusters):
probs = rng.dirichlet(alpha)
per_cluster[c][f"{name}_probs"] = {
lev: float(p) for lev, p in zip(levels, probs, strict=False)
}
return tuple(feature_specs), tuple(per_cluster)
def _build_clustered_sampler(
n_nuisance: int, n_correlated: int = 0,
) -> MixtureConfig:
"""Build the 2-cluster MixtureConfig for the clustered template.
Cluster 0 (60%): high recency, mobile-heavy, more returning visitors.
Cluster 1 (40%): low recency, desktop-heavy, fewer returning visitors.
"""
corr_specs, corr_params = _make_correlated_nuisance(2, n_correlated)
nuisance_features = [FeatureSpec(name=f"z{i}", kind="continuous") for i in range(n_nuisance)]
all_feature_specs = tuple(list(_DEFAULT_CORE_FEATURES) + nuisance_features + list(corr_specs))
nuisance_params = {
**{f"z{i}_mean": 0.0 for i in range(n_nuisance)},
**{f"z{i}_std": 1.0 for i in range(n_nuisance)},
}
return MixtureConfig(
features=all_feature_specs,
weights=(0.6, 0.4),
cluster_params=(
{
"session_recency_mean": 1.5, "session_recency_std": 1.0,
"browse_depth_mean": 3.0, "browse_depth_std": 1.0,
"device_type_probs": {"mobile": 0.60, "desktop": 0.25, "tablet": 0.10, "other": 0.05},
"is_returning_prob": 0.45,
"channel_probs": {"direct": 0.40, "organic": 0.30, "paid": 0.20, "social": 0.10},
**nuisance_params,
**corr_params[0],
},
{
"session_recency_mean": -1.5, "session_recency_std": 1.0,
"browse_depth_mean": 1.5, "browse_depth_std": 1.0,
"device_type_probs": {"mobile": 0.20, "desktop": 0.60, "tablet": 0.15, "other": 0.05},
"is_returning_prob": 0.20,
"channel_probs": {"direct": 0.15, "organic": 0.25, "paid": 0.45, "social": 0.15},
**nuisance_params,
**corr_params[1],
},
),
)
def _build_4cluster_ecommerce(
n_nuisance: int = 5, n_correlated: int = 0,
) -> MixtureConfig:
"""Build the 4-cluster MixtureConfig for the clustered_realistic template.
Segments:
0 (30%): loyal-mobile — frequent mobile shoppers, high recency, returning.
1 (25%): new-desktop — new desktop visitors, medium recency, not returning.
2 (25%): deal-seekers — paid/email channel, medium browse depth.
3 (20%): inactive — low recency, low browse depth, mostly organic.
"""
corr_specs, corr_params = _make_correlated_nuisance(4, n_correlated)
nuisance_features = [FeatureSpec(name=f"z{i}", kind="continuous") for i in range(n_nuisance)]
all_feature_specs = tuple(list(_DEFAULT_CORE_FEATURES) + nuisance_features + list(corr_specs))
nuisance_params = {
**{f"z{i}_mean": 0.0 for i in range(n_nuisance)},
**{f"z{i}_std": 1.0 for i in range(n_nuisance)},
}
return MixtureConfig(
features=all_feature_specs,
weights=(0.30, 0.25, 0.25, 0.20),
cluster_params=(
# Cluster 0: loyal-mobile
{
"session_recency_mean": 2.5, "session_recency_std": 0.8,
"browse_depth_mean": 4.5, "browse_depth_std": 1.5,
"device_type_probs": {"mobile": 0.75, "desktop": 0.15, "tablet": 0.10},
"is_returning_prob": 0.70,
"channel_probs": {"organic": 0.50, "direct": 0.30, "paid": 0.15, "social": 0.05},
**nuisance_params,
**corr_params[0],
},
# Cluster 1: new-desktop
{
"session_recency_mean": 1.0, "session_recency_std": 1.2,
"browse_depth_mean": 2.5, "browse_depth_std": 1.5,
"device_type_probs": {"mobile": 0.20, "desktop": 0.70, "tablet": 0.10},
"is_returning_prob": 0.15,
"channel_probs": {"organic": 0.35, "paid": 0.35, "social": 0.20, "email": 0.10},
**nuisance_params,
**corr_params[1],
},
# Cluster 2: deal-seekers
{
"session_recency_mean": 0.5, "session_recency_std": 1.5,
"browse_depth_mean": 5.0, "browse_depth_std": 2.0,
"device_type_probs": {"mobile": 0.45, "desktop": 0.40, "tablet": 0.15},
"is_returning_prob": 0.40,
"channel_probs": {"paid": 0.45, "email": 0.30, "organic": 0.15, "social": 0.10},
**nuisance_params,
**corr_params[2],
},
# Cluster 3: inactive
{
"session_recency_mean": -1.5, "session_recency_std": 1.0,
"browse_depth_mean": 1.0, "browse_depth_std": 0.8,
"device_type_probs": {"mobile": 0.35, "desktop": 0.50, "tablet": 0.15},
"is_returning_prob": 0.20,
"channel_probs": {"organic": 0.60, "social": 0.25, "paid": 0.10, "email": 0.05},
**nuisance_params,
**corr_params[3],
},
),
)
def _build_6cluster_ecommerce(
n_nuisance: int = 5, n_correlated: int = 0,
) -> MixtureConfig:
"""Build the 6-cluster MixtureConfig for the clustered_complex template.
Extends 4-cluster with:
4 (12%): premium-tablet — high AOV, tablet-heavy, returning, low browse depth.
5 (8%): social-browsers — social channel, high browse depth, low conversion intent.
Remaining weights re-normalized: (0.25, 0.20, 0.20, 0.15, 0.12, 0.08).
"""
corr_specs, corr_params = _make_correlated_nuisance(6, n_correlated)
nuisance_features = [FeatureSpec(name=f"z{i}", kind="continuous") for i in range(n_nuisance)]
all_feature_specs = tuple(list(_DEFAULT_CORE_FEATURES) + nuisance_features + list(corr_specs))
nuisance_params = {
**{f"z{i}_mean": 0.0 for i in range(n_nuisance)},
**{f"z{i}_std": 1.0 for i in range(n_nuisance)},
}
return MixtureConfig(
features=all_feature_specs,
weights=(0.25, 0.20, 0.20, 0.15, 0.12, 0.08),
cluster_params=(
# Cluster 0: loyal-mobile
{
"session_recency_mean": 2.5, "session_recency_std": 0.8,
"browse_depth_mean": 4.5, "browse_depth_std": 1.5,
"device_type_probs": {"mobile": 0.75, "desktop": 0.15, "tablet": 0.10},
"is_returning_prob": 0.70,
"channel_probs": {"organic": 0.50, "direct": 0.30, "paid": 0.15, "social": 0.05},
**nuisance_params,
**corr_params[0],
},
# Cluster 1: new-desktop
{
"session_recency_mean": 1.0, "session_recency_std": 1.2,
"browse_depth_mean": 2.5, "browse_depth_std": 1.5,
"device_type_probs": {"mobile": 0.20, "desktop": 0.70, "tablet": 0.10},
"is_returning_prob": 0.15,
"channel_probs": {"organic": 0.35, "paid": 0.35, "social": 0.20, "email": 0.10},
**nuisance_params,
**corr_params[1],
},
# Cluster 2: deal-seekers
{
"session_recency_mean": 0.5, "session_recency_std": 1.5,
"browse_depth_mean": 5.0, "browse_depth_std": 2.0,
"device_type_probs": {"mobile": 0.45, "desktop": 0.40, "tablet": 0.15},
"is_returning_prob": 0.40,
"channel_probs": {"paid": 0.45, "email": 0.30, "organic": 0.15, "social": 0.10},
**nuisance_params,
**corr_params[2],
},
# Cluster 3: inactive
{
"session_recency_mean": -1.5, "session_recency_std": 1.0,
"browse_depth_mean": 1.0, "browse_depth_std": 0.8,
"device_type_probs": {"mobile": 0.35, "desktop": 0.50, "tablet": 0.15},
"is_returning_prob": 0.20,
"channel_probs": {"organic": 0.60, "social": 0.25, "paid": 0.10, "email": 0.05},
**nuisance_params,
**corr_params[3],
},
# Cluster 4: premium-tablet
{
"session_recency_mean": 1.5, "session_recency_std": 1.0,
"browse_depth_mean": 2.0, "browse_depth_std": 1.0,
"device_type_probs": {"mobile": 0.15, "desktop": 0.20, "tablet": 0.65},
"is_returning_prob": 0.60,
"channel_probs": {"direct": 0.50, "organic": 0.30, "paid": 0.15, "email": 0.05},
**nuisance_params,
**corr_params[4],
},
# Cluster 5: social-browsers
{
"session_recency_mean": 0.0, "session_recency_std": 1.5,
"browse_depth_mean": 6.0, "browse_depth_std": 2.5,
"device_type_probs": {"mobile": 0.60, "desktop": 0.25, "tablet": 0.15},
"is_returning_prob": 0.25,
"channel_probs": {"social": 0.70, "organic": 0.15, "paid": 0.10, "email": 0.05},
**nuisance_params,
**corr_params[5],
},
),
)
# ---------------------------------------------------------------------------
# Cluster effect profile presets (Task 6.2)
# ---------------------------------------------------------------------------
#: Default 2-cluster profiles for the ``clustered`` template.
#: Cluster 0: positive conversion lift with linear session_recency signal.
#: Cluster 1: negative conversion lift with opposing linear signal.
#: Amplitudes are sized so that per-cluster mean CATEs differ by > RECOVERY (0.1)
#: at the default baseline_rate=0.05 with the default 2-cluster MixtureConfig.
_CLUSTERED_DEFAULT_PROFILES: tuple[ClusterEffectProfile, ...] = (
ClusterEffectProfile(
conversion_delta=+0.60,
severity_delta=+0.15,
conversion_signals=(
SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=0.5),
),
severity_signals=(
SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=0.3),
),
),
ClusterEffectProfile(
conversion_delta=-0.50,
severity_delta=-0.15,
conversion_signals=(
SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=-0.3),
),
severity_signals=(
SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=-0.2),
),
),
)
#: 4-cluster profiles for ``clustered_realistic``.
#: Clusters have distinct response profiles across conversion and severity.
_CLUSTERED_REALISTIC_PROFILES: tuple[ClusterEffectProfile, ...] = (
# Cluster 0: loyal-mobile — strong positive lift, recency + browse interaction
ClusterEffectProfile(
conversion_delta=+0.30,
severity_delta=+0.15,
conversion_signals=(
SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=0.6),
SignalTerm(
signal_type="interaction",
drivers=("session_recency", "browse_depth"),
amplitude=0.3,
),
),
severity_signals=(
SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.4),
),
),
# Cluster 1: new-desktop — moderate negative lift, browse depth signal
ClusterEffectProfile(
conversion_delta=-0.15,
severity_delta=-0.05,
conversion_signals=(
SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=-0.4),
),
severity_signals=(),
),
# Cluster 2: deal-seekers — near-zero conversion lift, positive severity
ClusterEffectProfile(
conversion_delta=+0.05,
severity_delta=+0.20,
conversion_signals=(
SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.2),
),
severity_signals=(
SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.5),
),
),
# Cluster 3: inactive — negative lift both channels
ClusterEffectProfile(
conversion_delta=-0.25,
severity_delta=-0.15,
conversion_signals=(
SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=-0.3),
),
severity_signals=(),
),
)
#: 6-cluster profiles for ``clustered_complex``.
#: All signal types including reversed amplitudes; stress test for deep policy trees.
_CLUSTERED_COMPLEX_PROFILES: tuple[ClusterEffectProfile, ...] = (
# Cluster 0: loyal-mobile — strong positive, recency + browse interaction
ClusterEffectProfile(
conversion_delta=+0.35,
severity_delta=+0.20,
conversion_signals=(
SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=0.6),
SignalTerm(
signal_type="interaction",
drivers=("session_recency", "browse_depth"),
amplitude=0.4,
),
),
severity_signals=(
SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.5),
),
),
# Cluster 1: new-desktop — negative linear + quadratic on browse_depth
ClusterEffectProfile(
conversion_delta=-0.20,
severity_delta=-0.10,
conversion_signals=(
SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=-0.5),
SignalTerm(signal_type="quadratic", drivers=("browse_depth",), amplitude=0.1),
),
severity_signals=(),
),
# Cluster 2: deal-seekers — small positive, step on channel + linear on depth
ClusterEffectProfile(
conversion_delta=+0.10,
severity_delta=+0.25,
conversion_signals=(
SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.3),
SignalTerm(
signal_type="step",
drivers=("channel",),
amplitude=0.4,
level_values={"paid": 0.8, "organic": -0.2, "direct": 0.0, "social": -0.3},
),
),
severity_signals=(
SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.6),
),
),
# Cluster 3: inactive — negative lift, reversed recency signal
ClusterEffectProfile(
conversion_delta=-0.30,
severity_delta=-0.20,
conversion_signals=(
SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=-0.4),
),
severity_signals=(),
),
# Cluster 4: premium-tablet — positive lift, interaction with browse depth
ClusterEffectProfile(
conversion_delta=+0.25,
severity_delta=+0.30,
conversion_signals=(
SignalTerm(
signal_type="interaction",
drivers=("session_recency", "browse_depth"),
amplitude=0.5,
),
),
severity_signals=(
SignalTerm(signal_type="quadratic", drivers=("browse_depth",), amplitude=0.2),
),
),
# Cluster 5: social-browsers — near-zero conversion, negative severity (browse fatigue)
ClusterEffectProfile(
conversion_delta=+0.02,
severity_delta=-0.10,
conversion_signals=(
SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=-0.3),
),
severity_signals=(
SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=-0.4),
),
),
)
# ---------------------------------------------------------------------------
# Template surface shapes (Task 3.2)
# ---------------------------------------------------------------------------
def _template_constant(
metric_id: str,
effect_scale: float,
*,
n_visitors: int = 1000,
n_nuisance: int = 5,
treatment_allocation: float = 0.5,
seed: int = 42,
baseline_rate: float = 0.05,
severity_sigma: float = 0.5,
sampler: CopulaConfig | MixtureConfig | None = None,
revenue_model: str | CartRevenueConfig = "lognormal",
**_kwargs,
) -> tuple[V2GeneratorConfig, dict]:
"""Constant treatment effect: homogeneous CATE across all visitors."""
intercept = float(logit(baseline_rate))
p0, p1_fn, _ = _logit_additive_surfaces(
intercept, lambda X: np.ones(len(X)),
)
return _build_scenario(
template_name="constant",
p0_surface=p0,
p1_surface_fn=p1_fn,
metric_id=metric_id,
effect_scale=effect_scale,
severity_sigma=severity_sigma,
n_visitors=n_visitors,
n_nuisance=n_nuisance,
treatment_allocation=treatment_allocation,
seed=seed,
baseline_rate=baseline_rate,
sampler=sampler,
revenue_model=revenue_model,
)
def _template_reversal(
metric_id: str,
effect_scale: float,
*,
n_visitors: int = 1000,
n_nuisance: int = 5,
treatment_allocation: float = 0.5,
seed: int = 42,
baseline_rate: float = 0.05,
severity_sigma: float = 0.5,
sampler: CopulaConfig | MixtureConfig | None = None,
driver: str = "session_recency",
driver_target_level: Any | None = None,
**_kwargs,
) -> tuple[V2GeneratorConfig, dict]:
"""Reversal: CATE flips sign based on the driver feature.
Continuous: h(X) = X[driver] - mean (flips at the population mean).
Default driver session_recency flips at mean = 2.0.
Categorical: h(X) = +0.5 target / -0.5 others (symmetric opposite-sign).
"""
effective_sampler = sampler if sampler is not None else _build_default_mixture_sampler(n_nuisance)
driver_info = _resolve_driver(driver, effective_sampler)
intercept = float(logit(baseline_rate))
signal_fn, extra_meta = _build_signal(
driver_info, driver, driver_target_level, "reversal",
cat_target_val=0.5, cat_other_val=-0.5,
)
p0, p1_fn, m1_fn = _logit_additive_surfaces(intercept, signal_fn)
config, metadata = _build_scenario(
template_name="reversal",
p0_surface=p0,
p1_surface_fn=p1_fn,
metric_id=metric_id,
effect_scale=effect_scale,
severity_sigma=severity_sigma,
n_visitors=n_visitors,
n_nuisance=n_nuisance,
treatment_allocation=treatment_allocation,
seed=seed,
baseline_rate=baseline_rate,
sampler=sampler,
m1_surface_fn=m1_fn,
driver=driver,
)
metadata.update(extra_meta)
return config, metadata
def _template_sparse_benefit(
metric_id: str,
effect_scale: float,
*,
n_visitors: int = 1000,
n_nuisance: int = 5,
treatment_allocation: float = 0.5,
seed: int = 42,
baseline_rate: float = 0.05,
severity_sigma: float = 0.5,
benefit_prevalence: float = 0.15,
sampler: CopulaConfig | MixtureConfig | None = None,
driver: str = "session_recency",
driver_target_level: Any | None = None,
**_kwargs,
) -> tuple[V2GeneratorConfig, dict]:
"""Sparse benefit: only a small fraction of visitors has positive CATE.
Uses threshold (continuous) or mask (categorical) — not logit-additive.
"""
from scipy import stats
effective_sampler = sampler if sampler is not None else _build_default_mixture_sampler(n_nuisance)
driver_info = _resolve_driver(driver, effective_sampler)
extra_meta: dict = {}
p0_surface = _const_surface(baseline_rate)
if driver_info.kind == "continuous":
_mu = driver_info.mean
_std = driver_info.std
cutoff = float(stats.norm(loc=_mu, scale=_std).ppf(1.0 - benefit_prevalence))
def p1_surface_fn(es: float) -> SurfaceConfig:
above_p1 = min(baseline_rate + es * 0.40, 0.99)
return threshold_surface(
SurfaceConfig(fn=lambda X, _d=driver: X[_d].values.astype(float)),
cutoff=cutoff, above=above_p1, below=baseline_rate,
)
else:
_t, _src = _resolve_categorical_target(driver_info, driver_target_level, "sparse_benefit")
extra_meta = {"driver_target_level": _t, "driver_target_source": _src}
def _benefit_mask_fn(X: pd.DataFrame, _d: str = driver, _tgt: Any = _t) -> np.ndarray:
return X[_d].values == _tgt
def p1_surface_fn(es: float) -> SurfaceConfig:
above_p1 = min(baseline_rate + es * 0.40, 0.99)
def _fn(X: pd.DataFrame, _br: float = baseline_rate,
_ap: float = above_p1, _mask_fn=_benefit_mask_fn) -> np.ndarray:
return np.where(_mask_fn(X), _ap, _br).astype(float)
return SurfaceConfig(fn=_fn)
config, metadata = _build_scenario(
template_name="sparse_benefit",
p0_surface=p0_surface,
p1_surface_fn=p1_surface_fn,
metric_id=metric_id,
effect_scale=effect_scale,
severity_sigma=severity_sigma,
n_visitors=n_visitors,
n_nuisance=n_nuisance,
treatment_allocation=treatment_allocation,
seed=seed,
baseline_rate=baseline_rate,
sampler=sampler,
driver=driver,
)
metadata["benefit_prevalence"] = benefit_prevalence
metadata.update(extra_meta)
return config, metadata
def _template_interaction_only(
metric_id: str,
effect_scale: float,
*,
n_visitors: int = 1000,
n_nuisance: int = 5,
treatment_allocation: float = 0.5,
seed: int = 42,
baseline_rate: float = 0.05,
severity_sigma: float = 0.5,
sampler: CopulaConfig | MixtureConfig | None = None,
driver: str = "session_recency",
driver_secondary: str = "browse_depth",
**_kwargs,
) -> tuple[V2GeneratorConfig, dict]:
"""Interaction-only: CATE dominated by driver×driver_secondary product term."""
effective_sampler = sampler if sampler is not None else _build_default_mixture_sampler(n_nuisance)
driver_info = _resolve_driver(driver, effective_sampler)
driver2_info = _resolve_driver(driver_secondary, effective_sampler)
_require_continuous(driver_info, "driver", "interaction_only")
_require_continuous(driver2_info, "driver_secondary", "interaction_only")
intercept = float(logit(baseline_rate))
_mu1 = driver_info.mean
_mu2 = driver2_info.mean
def raw_signal(X, _d=driver, _ds=driver_secondary, _m1=_mu1, _m2=_mu2):
return (
(X[_d].values.astype(float) - _m1) * (X[_ds].values.astype(float) - _m2)
)
p0, p1_fn, _ = _logit_additive_surfaces(intercept, raw_signal)
config, metadata = _build_scenario(
template_name="interaction_only",
p0_surface=p0,
p1_surface_fn=p1_fn,
metric_id=metric_id,
effect_scale=effect_scale,
severity_sigma=severity_sigma,
n_visitors=n_visitors,
n_nuisance=n_nuisance,
treatment_allocation=treatment_allocation,
seed=seed,
baseline_rate=baseline_rate,
sampler=sampler,
driver=driver,
)
metadata["driver_secondary"] = driver_secondary
return config, metadata
def _template_nonlinear(
metric_id: str,
effect_scale: float,
*,
n_visitors: int = 1000,
n_nuisance: int = 5,
treatment_allocation: float = 0.5,
seed: int = 42,
baseline_rate: float = 0.05,
severity_sigma: float = 0.5,
sampler: CopulaConfig | MixtureConfig | None = None,
driver: str = "session_recency",
driver_secondary: str = "browse_depth",
**_kwargs,
) -> tuple[V2GeneratorConfig, dict]:
"""Nonlinear: quadratic + interaction treatment surface."""
effective_sampler = sampler if sampler is not None else _build_default_mixture_sampler(n_nuisance)
driver_info = _resolve_driver(driver, effective_sampler)
driver2_info = _resolve_driver(driver_secondary, effective_sampler)
_require_continuous(driver_info, "driver", "nonlinear")
_require_continuous(driver2_info, "driver_secondary", "nonlinear")
intercept = float(logit(baseline_rate))
_mu = driver_info.mean
_mu2 = driver2_info.mean
def raw_signal(X, _d=driver, _ds=driver_secondary, _m=_mu, _m2=_mu2):
return (
0.5 * (X[_d].values.astype(float) - _m) ** 2
+ 0.5 * (X[_d].values.astype(float) - _m) * (X[_ds].values.astype(float) - _m2)
)
# p0 uses _const_surface(baseline_rate) — mathematically identical to
# sigmoid(logit(baseline_rate)) but preserves the original constant form.
p0_surface = _const_surface(baseline_rate)
_, p1_fn, _ = _logit_additive_surfaces(intercept, raw_signal)
config, metadata = _build_scenario(
template_name="nonlinear",
p0_surface=p0_surface,
p1_surface_fn=p1_fn,
metric_id=metric_id,
effect_scale=effect_scale,
severity_sigma=severity_sigma,
n_visitors=n_visitors,
n_nuisance=n_nuisance,
treatment_allocation=treatment_allocation,
seed=seed,
baseline_rate=baseline_rate,
sampler=sampler,
driver=driver,
)
metadata["surface_type"] = "quadratic_interaction"
metadata["driver_secondary"] = driver_secondary
return config, metadata
def _build_clustered_surfaces(
mixture_sampler: MixtureConfig,
profiles: tuple[ClusterEffectProfile, ...],
driver_info_map: dict[str, DriverInfo],
*,
baseline_rate: float,
effect_scale: float,
hte_scale: float,
max_aov_ratio: float = 200.0,
) -> tuple[SurfaceConfig, SurfaceConfig, Callable[[float], SurfaceConfig], Callable[[float], SurfaceConfig]]:
"""Build (p0_surface, m0_surface, p1_surface_fn, m1_surface_fn) for clustered template.
The latent cluster type drives both prognostic outcomes (p0, m0) and treatment
response (p1, m1). Per-cluster prognostic baselines and within-cluster signal
terms live on ``ClusterEffectProfile``. Observed features are noisy windows into
the latent type — BCF only sees features, not cluster_id.
Parameters
----------
mixture_sampler:
MixtureConfig whose cluster_id column is available in X during surface evaluation.
profiles:
Per-cluster effect + prognostic specifications. Must have the same length as
``mixture_sampler.weights``.
driver_info_map:
Resolved DriverInfo for every feature referenced in any SignalTerm across all
profiles (including prognostic signals).
baseline_rate:
Global baseline conversion probability. Per-cluster prognostic_conv_base shifts
are centered so the weighted mean stays at this rate.
effect_scale:
Global CATE magnitude scaling.
hte_scale:
Within-cluster HTE signal amplitude scaling. ``hte_scale=0`` suppresses all
within-cluster treatment gradients; ``hte_scale=1`` applies full amplitudes.
Returns
-------
tuple
``(p0_surface, m0_surface, p1_surface_fn, m1_surface_fn)`` where:
- ``p0_surface`` has per-cluster baselines + within-cluster prognostic signals.
- ``m0_surface`` has per-cluster baselines + within-cluster prognostic signals.
- ``p1_surface_fn(es)`` builds treatment conversion on top of heterogeneous p0.
- ``m1_surface_fn(es)`` builds treatment severity on top of heterogeneous m0.
"""
n_clusters = len(profiles)
_m0_base = 50.0
# Pre-compute per-cluster treatment logit shifts and severity factors.
_conv_shifts: list[float] = [
_relative_lift_to_logit_shift(profiles[k].conversion_delta, baseline_rate)
for k in range(n_clusters)
]
_sev_factors: list[float] = [
_severity_multiplicative_factor(profiles[k].severity_delta)
for k in range(n_clusters)
]
_logit_p0 = float(logit(baseline_rate))
_log_m0 = float(np.log(_m0_base))
def _eval_all_signals(
X: pd.DataFrame,
signals: tuple[SignalTerm, ...],
_hs: float,
_di: dict[str, DriverInfo],
) -> np.ndarray:
"""Sum all signal terms, scaled by hte_scale."""
if not signals or _hs == 0.0:
return np.zeros(len(X), dtype=float)
total = np.zeros(len(X), dtype=float)
for term in signals:
total += _evaluate_signal(term, X, _di)
return _hs * total
def _eval_prognostic_signals(
X: pd.DataFrame,
signals: tuple[SignalTerm, ...],
_di: dict[str, DriverInfo],
) -> np.ndarray:
"""Sum prognostic signal terms (no hte_scale — amplitudes are baked in)."""
if not signals:
return np.zeros(len(X), dtype=float)
total = np.zeros(len(X), dtype=float)
for term in signals:
total += _evaluate_signal(term, X, _di)
return total
# Collect all driver column names used by any signal across all profiles
# (HTE + prognostic), so surface functions can slice only the needed columns.
_driver_cols: set[str] = set()
for prof in profiles:
for sig in (prof.conversion_signals + prof.severity_signals
+ prof.prognostic_conv_signals + prof.prognostic_sev_signals):
_driver_cols.update(sig.drivers)
# Always need cluster_id for masking.
_driver_cols.add("cluster_id")
_driver_col_list: list[str] = sorted(_driver_cols)
# --- p0 surface: per-cluster baselines + within-cluster prognostic signals ---
def _p0_fn(
X: pd.DataFrame,
_lp0: float = _logit_p0,
_profs: tuple[ClusterEffectProfile, ...] = profiles,
_di: dict[str, DriverInfo] = driver_info_map,
_cols: list[str] = _driver_col_list,
) -> np.ndarray:
X_slim = cast(pd.DataFrame, X[_cols])
cid = np.asarray(X_slim["cluster_id"])
logit_p0 = np.full(len(X), _lp0, dtype=float)
for k, prof in enumerate(_profs):
mask = np.asarray(cid == k)
if not np.any(mask):
continue
# Latent type component: per-cluster baseline shift
logit_p0[mask] += prof.prognostic_conv_base
# Within-cluster feature gradients
logit_p0[mask] += _eval_prognostic_signals(
X_slim.loc[mask], prof.prognostic_conv_signals, _di,
)
return expit(logit_p0)
p0_surface = SurfaceConfig(fn=_p0_fn)
# --- m0 surface: per-cluster baselines + within-cluster prognostic signals ---
# Clamp log-space to [m0_base/ratio, m0_base*ratio] before exp()
_log_m0_floor = float(np.log(_m0_base / max_aov_ratio))
_log_m0_ceil = float(np.log(_m0_base * max_aov_ratio))
def _m0_fn(
X: pd.DataFrame,
_lm0: float = _log_m0,
_profs: tuple[ClusterEffectProfile, ...] = profiles,
_di: dict[str, DriverInfo] = driver_info_map,
_cols: list[str] = _driver_col_list,
_floor: float = _log_m0_floor,
_ceil: float = _log_m0_ceil,
) -> np.ndarray:
X_slim = cast(pd.DataFrame, X[_cols])
cid = np.asarray(X_slim["cluster_id"])
log_m0 = np.full(len(X), _lm0, dtype=float)
for k, prof in enumerate(_profs):
mask = np.asarray(cid == k)
if not np.any(mask):
continue
log_m0[mask] += prof.prognostic_sev_base
log_m0[mask] += _eval_prognostic_signals(
X_slim.loc[mask], prof.prognostic_sev_signals, _di,
)
np.clip(log_m0, _floor, _ceil, out=log_m0)
return np.exp(log_m0)
m0_surface = SurfaceConfig(fn=_m0_fn)
# --- p1 surface: treatment delta on top of heterogeneous p0 ---
def p1_surface_fn(es: float) -> SurfaceConfig:
def _fn(
X: pd.DataFrame,
_es: float = es,
_hs: float = hte_scale,
_shifts: list[float] = _conv_shifts,
_profs: tuple[ClusterEffectProfile, ...] = profiles,
_di: dict[str, DriverInfo] = driver_info_map,
_cols: list[str] = _driver_col_list,
_p0: Callable = _p0_fn,
) -> np.ndarray:
# Start from individual heterogeneous baselines
p0_vals = _p0(X)
logit_p1 = logit(p0_vals)
X_slim = cast(pd.DataFrame, X[_cols])
cid = np.asarray(X_slim["cluster_id"])
for k, (shift, prof) in enumerate(zip(_shifts, _profs, strict=False)):
mask = np.asarray(cid == k)
if not np.any(mask):
continue
X_k = X_slim.loc[mask]
within = _eval_all_signals(X_k, prof.conversion_signals, _hs, _di)
logit_p1[mask] += _es * (shift + within)
return expit(logit_p1)
return SurfaceConfig(fn=_fn)
# --- m1 surface: treatment delta on top of heterogeneous m0 ---
def m1_surface_fn(es: float) -> SurfaceConfig:
def _fn(
X: pd.DataFrame,
_es: float = es,
_hs: float = hte_scale,
_factors: list[float] = _sev_factors,
_profs: tuple[ClusterEffectProfile, ...] = profiles,
_di: dict[str, DriverInfo] = driver_info_map,
_cols: list[str] = _driver_col_list,
_m0: Callable = _m0_fn,
) -> np.ndarray:
# Start from individual heterogeneous baselines
m0_vals = _m0(X)
X_slim = cast(pd.DataFrame, X[_cols])
cid = np.asarray(X_slim["cluster_id"])
m1 = np.copy(m0_vals)
for k, (factor, prof) in enumerate(zip(_factors, _profs, strict=False)):
mask = np.asarray(cid == k)
if not np.any(mask):
continue
X_k = X_slim.loc[mask]
within = _eval_all_signals(X_k, prof.severity_signals, _hs, _di)
cluster_delta = _es * ((factor - 1.0) + within)
m1[mask] = np.maximum(m0_vals[mask] * (1.0 + cluster_delta), 1e-3)
return m1
return SurfaceConfig(fn=_fn)
return p0_surface, m0_surface, p1_surface_fn, m1_surface_fn
def _collect_driver_info_for_profiles(
profiles: tuple[ClusterEffectProfile, ...],
mixture_sampler: MixtureConfig,
) -> dict[str, DriverInfo]:
"""Resolve DriverInfo for all features referenced in any SignalTerm across all profiles."""
driver_names: set[str] = set()
for prof in profiles:
for term in (prof.conversion_signals + prof.severity_signals
+ prof.prognostic_conv_signals + prof.prognostic_sev_signals):
driver_names.update(term.drivers)
return {name: _resolve_driver(name, mixture_sampler) for name in driver_names}
def _template_clustered(
metric_id: str,
effect_scale: float,
*,
n_visitors: int = 1000,
n_nuisance: int = 5,
n_correlated: int = 0,
treatment_allocation: float = 0.5,
seed: int = 42,
baseline_rate: float = 0.05,
severity_sigma: float = 0.5,
sampler: CopulaConfig | MixtureConfig | None = None,
hte_scale: float = 1.0,
driver: str = "session_recency",
driver_target_level: Any | None = None,
**_kwargs,
) -> tuple[V2GeneratorConfig, dict]:
"""Clustered: generalized per-cluster logit-additive surfaces with 2-cluster MixtureConfig.
Uses ``_CLUSTERED_DEFAULT_PROFILES`` and ``_build_clustered_sampler`` to build
surfaces via the generalized ``_build_clustered_surfaces`` helper.
Parameters
----------
hte_scale:
Within-cluster signal amplitude scaling. ``0`` = step function (cluster-level
deltas only); ``1`` = full within-cluster gradients active.
n_correlated:
Number of cluster-correlated nuisance features (default 0).
"""
if sampler is not None:
raise ValueError(
"clustered template uses its own MixtureConfig sampler; "
"external sampler= override is not supported"
)
mixture_sampler = _build_clustered_sampler(n_nuisance, n_correlated)
profiles = _CLUSTERED_DEFAULT_PROFILES
driver_info_map = _collect_driver_info_for_profiles(profiles, mixture_sampler)
p0_surface, _m0_surf, p1_surface_fn, m1_surface_fn = _build_clustered_surfaces(
mixture_sampler,
profiles,
driver_info_map,
baseline_rate=baseline_rate,
effect_scale=effect_scale,
hte_scale=hte_scale,
)
config, metadata = _build_scenario(
template_name="clustered",
p0_surface=p0_surface,
p1_surface_fn=p1_surface_fn,
metric_id=metric_id,
effect_scale=effect_scale,
severity_sigma=severity_sigma,
n_visitors=n_visitors,
n_nuisance=n_nuisance,
treatment_allocation=treatment_allocation,
seed=seed,
baseline_rate=baseline_rate,
sampler=mixture_sampler,
m1_surface_fn=m1_surface_fn,
driver=driver,
)
metadata["n_clusters"] = len(mixture_sampler.weights)
metadata["hte_scale"] = hte_scale
metadata["n_correlated"] = n_correlated
return config, metadata
def _build_clustered_surfaces_marm(
mixture_sampler: MixtureConfig,
profiles: tuple[ClusterEffectProfile, ...],
driver_info_map: dict[str, DriverInfo],
*,
baseline_rate: float,
effect_scale: float,
hte_scale: float,
K: int,
max_aov_ratio: float = 200.0,
) -> tuple[
list[SurfaceConfig],
list[SurfaceConfig] | None,
list[SurfaceConfig] | None,
]:
"""Build K per-treatment-level potential-outcome surfaces for K-arm clustered_realistic.
Constructs:
- ``p_surfaces[0]``: heterogeneous control conversion (identical to _build_clustered_surfaces p0).
- ``p_surfaces[k]`` for k=1..K-1: treatment-k conversion built on top of p0.
- ``m_surfaces[0]``: heterogeneous control severity (from per-cluster baselines).
- ``m_surfaces[k]`` for k=1..K-1: treatment-k severity built on top of m0.
- ``sigma_surfaces[k]``: constant severity_sigma for all k (hardcoded 0.5 by default;
the caller passes severity_sigma as part of surface construction).
Treatment-response scheme:
For treatment level t (t = 1..K-1), the per-cluster responses are derived from
_CLUSTERED_REALISTIC_PROFILES by:
1. Cyclic-shifting the base cluster assignment by s = (t-1) % n_clusters,
so treatment t at cluster c uses base profile[(c - s) % n_clusters]'s
treatment signals. This gives genuinely independent CATE patterns across
treatments.
2. Scaling the per-cluster conversion_delta and severity_delta by mag_t =
linspace(0.7, 1.3, K-1)[t-1], ensuring treatments differ even when shifts
repeat at high K.
Parameters
----------
mixture_sampler:
MixtureConfig whose cluster_id column is available in X during surface evaluation.
profiles:
Base per-cluster profiles (length = n_clusters = 4 for clustered_realistic).
driver_info_map:
Resolved DriverInfo for all signal drivers.
baseline_rate:
Global baseline conversion probability.
effect_scale:
Global CATE magnitude scaling.
hte_scale:
Within-cluster signal amplitude scaling.
K:
Number of treatment levels (K >= 3).
max_aov_ratio:
Clamp ratio for severity log-space.
Returns
-------
(p_surfaces, m_surfaces, sigma_surfaces)
Each is a list of K SurfaceConfig objects. For conversion_rate callers,
m_surfaces and sigma_surfaces are None — callers should pass severity_sigma
to build sigma_surfaces if needed. Here we always return all three; the
caller drops m_surfaces/sigma_surfaces when metric_id == "conversion_rate".
"""
n_clusters = len(profiles)
_m0_base = 50.0
_log_m0 = float(np.log(_m0_base))
_logit_p0 = float(logit(baseline_rate))
_log_m0_floor = float(np.log(_m0_base / max_aov_ratio))
_log_m0_ceil = float(np.log(_m0_base * max_aov_ratio))
# Per-treatment magnitude scaling (endpoints are exact, so K=3 yields
# exactly [0.7, 1.3]).
_mags = np.linspace(0.7, 1.3, K - 1)
# Derive K-1 treatment-response tuples (one per treatment level t=1..K-1).
# Each tuple: (per_cluster_conv_shifts, per_cluster_sev_factors, per_cluster_signals)
# per_cluster_conv_shifts[c] = logit shift for cluster c under treatment t.
# per_cluster_sev_factors[c] = multiplicative severity factor for cluster c.
# per_cluster_conv_signals[c] = tuple of conversion SignalTerms for cluster c.
# per_cluster_sev_signals[c] = tuple of severity SignalTerms for cluster c.
_treatment_responses: list[
tuple[list[float], list[float], list[tuple[SignalTerm, ...]], list[tuple[SignalTerm, ...]]]
] = []
for t in range(1, K):
mag = float(_mags[t - 1])
s = (t - 1) % n_clusters
conv_shifts: list[float] = []
sev_factors: list[float] = []
conv_sigs: list[tuple[SignalTerm, ...]] = []
sev_sigs: list[tuple[SignalTerm, ...]] = []
for c in range(n_clusters):
src = (c - s) % n_clusters
base_prof = profiles[src]
scaled_delta = base_prof.conversion_delta * mag
scaled_sev = base_prof.severity_delta * mag
conv_shifts.append(
_relative_lift_to_logit_shift(scaled_delta, baseline_rate)
)
sev_factors.append(
_severity_multiplicative_factor(
max(scaled_sev, -0.9) # clamp so factor stays positive
)
)
conv_sigs.append(base_prof.conversion_signals)
sev_sigs.append(base_prof.severity_signals)
_treatment_responses.append((conv_shifts, sev_factors, conv_sigs, sev_sigs))
# --- Collect driver cols needed (union across all profiles) ---
_driver_cols: set[str] = set()
for prof in profiles:
for sig in (prof.conversion_signals + prof.severity_signals
+ prof.prognostic_conv_signals + prof.prognostic_sev_signals):
_driver_cols.update(sig.drivers)
_driver_cols.add("cluster_id")
_driver_col_list: list[str] = sorted(_driver_cols)
# --- Inline helpers (replicated from _build_clustered_surfaces, not shared) ---
def _eval_all_signals_marm(
X: pd.DataFrame,
signals: tuple[SignalTerm, ...],
_hs: float,
_di: dict[str, DriverInfo],
) -> np.ndarray:
if not signals or _hs == 0.0:
return np.zeros(len(X), dtype=float)
total = np.zeros(len(X), dtype=float)
for term in signals:
total += _evaluate_signal(term, X, _di)
return _hs * total
def _eval_prognostic_signals_marm(
X: pd.DataFrame,
signals: tuple[SignalTerm, ...],
_di: dict[str, DriverInfo],
) -> np.ndarray:
if not signals:
return np.zeros(len(X), dtype=float)
total = np.zeros(len(X), dtype=float)
for term in signals:
total += _evaluate_signal(term, X, _di)
return total
# --- Control surfaces (k=0): same construction as _build_clustered_surfaces p0/m0 ---
def _p0_fn_marm(
X: pd.DataFrame,
_lp0: float = _logit_p0,
_profs: tuple[ClusterEffectProfile, ...] = profiles,
_di: dict[str, DriverInfo] = driver_info_map,
_cols: list[str] = _driver_col_list,
) -> np.ndarray:
X_slim = cast(pd.DataFrame, X[_cols])
cid = np.asarray(X_slim["cluster_id"])
logit_p0 = np.full(len(X), _lp0, dtype=float)
for k, prof in enumerate(_profs):
mask = np.asarray(cid == k)
if not np.any(mask):
continue
logit_p0[mask] += prof.prognostic_conv_base
logit_p0[mask] += _eval_prognostic_signals_marm(
X_slim.loc[mask], prof.prognostic_conv_signals, _di,
)
return expit(logit_p0)
p0_surface = SurfaceConfig(fn=_p0_fn_marm)
def _m0_fn_marm(
X: pd.DataFrame,
_lm0: float = _log_m0,
_profs: tuple[ClusterEffectProfile, ...] = profiles,
_di: dict[str, DriverInfo] = driver_info_map,
_cols: list[str] = _driver_col_list,
_floor: float = _log_m0_floor,
_ceil: float = _log_m0_ceil,
) -> np.ndarray:
X_slim = cast(pd.DataFrame, X[_cols])
cid = np.asarray(X_slim["cluster_id"])
log_m0 = np.full(len(X), _lm0, dtype=float)
for k, prof in enumerate(_profs):
mask = np.asarray(cid == k)
if not np.any(mask):
continue
log_m0[mask] += prof.prognostic_sev_base
log_m0[mask] += _eval_prognostic_signals_marm(
X_slim.loc[mask], prof.prognostic_sev_signals, _di,
)
np.clip(log_m0, _floor, _ceil, out=log_m0)
return np.exp(log_m0)
m0_surface = SurfaceConfig(fn=_m0_fn_marm)
# --- Treatment surfaces (k=1..K-1) ---
p_treatment_surfaces: list[SurfaceConfig] = []
m_treatment_surfaces: list[SurfaceConfig] = []
for t in range(1, K):
t_conv_shifts, t_sev_factors, t_conv_sigs, t_sev_sigs = _treatment_responses[t - 1]
def _make_pk_surface(
_es: float,
_hs: float,
_shifts: list[float],
_csigs: list[tuple[SignalTerm, ...]],
_di: dict[str, DriverInfo],
_cols: list[str],
_p0_fn: Callable[[pd.DataFrame], np.ndarray],
_nc: int,
) -> SurfaceConfig:
def _fn(
X: pd.DataFrame,
_es: float = _es,
_hs: float = _hs,
_shifts: list[float] = _shifts,
_csigs: list[tuple[SignalTerm, ...]] = _csigs,
_di: dict[str, DriverInfo] = _di,
_cols: list[str] = _cols,
_p0_fn: Callable[[pd.DataFrame], np.ndarray] = _p0_fn,
_nc: int = _nc,
) -> np.ndarray:
p0_vals = _p0_fn(X)
logit_pk = logit(p0_vals)
X_slim = cast(pd.DataFrame, X[_cols])
cid = np.asarray(X_slim["cluster_id"])
for c in range(_nc):
mask = np.asarray(cid == c)
if not np.any(mask):
continue
X_c = X_slim.loc[mask]
within = _eval_all_signals_marm(X_c, _csigs[c], _hs, _di)
logit_pk[mask] += _es * (_shifts[c] + within)
return expit(logit_pk)
return SurfaceConfig(fn=_fn)
p_treatment_surfaces.append(_make_pk_surface(
effect_scale, hte_scale, t_conv_shifts, t_conv_sigs,
driver_info_map, _driver_col_list, _p0_fn_marm, n_clusters,
))
def _make_mk_surface(
_es: float,
_hs: float,
_factors: list[float],
_ssigs: list[tuple[SignalTerm, ...]],
_di: dict[str, DriverInfo],
_cols: list[str],
_m0_fn: Callable[[pd.DataFrame], np.ndarray],
_nc: int,
) -> SurfaceConfig:
def _fn(
X: pd.DataFrame,
_es: float = _es,
_hs: float = _hs,
_factors: list[float] = _factors,
_ssigs: list[tuple[SignalTerm, ...]] = _ssigs,
_di: dict[str, DriverInfo] = _di,
_cols: list[str] = _cols,
_m0_fn: Callable[[pd.DataFrame], np.ndarray] = _m0_fn,
_nc: int = _nc,
) -> np.ndarray:
m0_vals = _m0_fn(X)
X_slim = cast(pd.DataFrame, X[_cols])
cid = np.asarray(X_slim["cluster_id"])
mk = np.copy(m0_vals)
for c in range(_nc):
mask = np.asarray(cid == c)
if not np.any(mask):
continue
X_c = X_slim.loc[mask]
within = _eval_all_signals_marm(X_c, _ssigs[c], _hs, _di)
cluster_delta = _es * ((_factors[c] - 1.0) + within)
mk[mask] = np.maximum(m0_vals[mask] * (1.0 + cluster_delta), 1e-3)
return mk
return SurfaceConfig(fn=_fn)
m_treatment_surfaces.append(_make_mk_surface(
effect_scale, hte_scale, t_sev_factors, t_sev_sigs,
driver_info_map, _driver_col_list, _m0_fn_marm, n_clusters,
))
p_surfaces: list[SurfaceConfig] = [p0_surface] + p_treatment_surfaces
m_surfaces_list: list[SurfaceConfig] = [m0_surface] + m_treatment_surfaces
return p_surfaces, m_surfaces_list, None # sigma_surfaces built by caller
def _template_clustered_realistic(
metric_id: str,
effect_scale: float,
*,
K: int = 2,
n_visitors: int = 1000,
n_nuisance: int = 5,
n_correlated: int = 0,
treatment_allocation: float = 0.5,
seed: int = 42,
baseline_rate: float = 0.05,
severity_sigma: float = 0.5,
sampler: CopulaConfig | MixtureConfig | None = None,
hte_scale: float = 1.0,
**_kwargs,
) -> tuple[V2GeneratorConfig, dict]:
"""Clustered realistic: 4-cluster MixtureConfig with additive within-cluster signals.
Primary scenario for sequential targeting validation. Uses linear and
interaction signal terms across the 4 e-commerce segments.
Parameters
----------
K:
Number of treatment levels (control + K−1 treatments). ``K=2`` (default)
preserves the existing binary shape (bit-identical to the pre-change
implementation). ``K>=3`` produces a multi-treatment config carrying one
potential-outcome surface per treatment level.
hte_scale:
Within-cluster signal amplitude scaling. ``0`` = step function; ``1`` = full gradients.
n_correlated:
Number of cluster-correlated nuisance features (default 0).
"""
if K < 2:
raise ValueError(
f"clustered_realistic: K must be >= 2, got K={K}. "
"K < 2 is not a valid experiment."
)
if sampler is not None:
raise ValueError(
"clustered_realistic template uses its own MixtureConfig sampler; "
"external sampler= override is not supported"
)
mixture_sampler = _build_4cluster_ecommerce(n_nuisance, n_correlated)
profiles = _CLUSTERED_REALISTIC_PROFILES
driver_info_map = _collect_driver_info_for_profiles(profiles, mixture_sampler)
if K == 2:
# Existing K=2 path — unchanged, bit-identical to pre-change implementation.
p0_surface, _m0_surf, p1_surface_fn, m1_surface_fn = _build_clustered_surfaces(
mixture_sampler,
profiles,
driver_info_map,
baseline_rate=baseline_rate,
effect_scale=effect_scale,
hte_scale=hte_scale,
)
config, metadata = _build_scenario(
template_name="clustered_realistic",
p0_surface=p0_surface,
p1_surface_fn=p1_surface_fn,
metric_id=metric_id,
effect_scale=effect_scale,
severity_sigma=severity_sigma,
n_visitors=n_visitors,
n_nuisance=n_nuisance,
treatment_allocation=treatment_allocation,
seed=seed,
baseline_rate=baseline_rate,
sampler=mixture_sampler,
m1_surface_fn=m1_surface_fn,
driver="session_recency",
)
metadata["n_clusters"] = len(mixture_sampler.weights)
metadata["hte_scale"] = hte_scale
metadata["n_correlated"] = n_correlated
return config, metadata
# K >= 3: multi-arm path.
p_surfaces, m_surfaces_all, _ = _build_clustered_surfaces_marm(
mixture_sampler,
profiles,
driver_info_map,
baseline_rate=baseline_rate,
effect_scale=effect_scale,
hte_scale=hte_scale,
K=K,
)
# Build uniform arm allocation: [1/K, ..., 1/K], last entry normalized to sum to 1.
_frac = 1.0 / K
arm_alloc_list = [_frac] * (K - 1)
arm_alloc_list.append(1.0 - sum(arm_alloc_list))
treatment_probabilities = tuple(arm_alloc_list)
if metric_id == "conversion_rate":
metric_mode = MetricMode(
metric_id="conversion_rate",
p_surfaces=p_surfaces,
)
elif metric_id == "revenue_per_visitor":
sigma_surfaces = [_const_surface(severity_sigma) for _ in range(K)]
assert m_surfaces_all is not None
metric_mode = MetricMode(
metric_id="revenue_per_visitor",
p_surfaces=p_surfaces,
m_surfaces=m_surfaces_all,
sigma_surfaces=sigma_surfaces,
)
else:
raise ValueError(
f"clustered_realistic: unsupported metric_id {metric_id!r}. "
"Must be 'conversion_rate' or 'revenue_per_visitor'."
)
assignment = AssignmentConfig(treatment_probabilities=treatment_probabilities)
config = V2GeneratorConfig(
n_visitors=n_visitors,
feature_sampler=mixture_sampler,
metric_mode=metric_mode,
assignment=assignment,
seed=seed,
experiment_id=f"clustered_realistic-es{effect_scale}-seed{seed}-K{K}",
revenue_model=None,
)
metadata: dict[str, str | int | float] = {
"scenario_class": "clustered_realistic",
"template": "clustered_realistic",
"metric_id": metric_id,
"effect_scale": effect_scale,
"n_visitors": n_visitors,
"seed": seed,
"baseline_rate": baseline_rate,
"n_nuisance": n_nuisance,
"revenue_model": "lognormal",
"driver": "session_recency",
"n_clusters": len(mixture_sampler.weights),
"hte_scale": hte_scale,
"n_correlated": n_correlated,
"K": K,
}
# NOTE: no scalar "treatment_allocation" key at K>=3 — the fixed forced
# per-treatment assignment lives on config.assignment.treatment_probabilities.
# (A sequential experiment routes treatments by cell policy, not this vector.)
return config, metadata
def _template_clustered_complex(
metric_id: str,
effect_scale: float,
*,
n_visitors: int = 1000,
n_nuisance: int = 5,
n_correlated: int = 0,
treatment_allocation: float = 0.5,
seed: int = 42,
baseline_rate: float = 0.05,
severity_sigma: float = 0.5,
sampler: CopulaConfig | MixtureConfig | None = None,
hte_scale: float = 1.0,
**_kwargs,
) -> tuple[V2GeneratorConfig, dict]:
"""Clustered complex: 6-cluster MixtureConfig with all signal types.
Stress test for deep policy trees. Includes linear, interaction, and
quadratic signal terms, with reversed amplitudes across clusters.
Parameters
----------
hte_scale:
Within-cluster signal amplitude scaling. ``0`` = step function; ``1`` = full gradients.
n_correlated:
Number of cluster-correlated nuisance features (default 0).
"""
if sampler is not None:
raise ValueError(
"clustered_complex template uses its own MixtureConfig sampler; "
"external sampler= override is not supported"
)
mixture_sampler = _build_6cluster_ecommerce(n_nuisance, n_correlated)
profiles = _CLUSTERED_COMPLEX_PROFILES
driver_info_map = _collect_driver_info_for_profiles(profiles, mixture_sampler)
p0_surface, _m0_surf, p1_surface_fn, m1_surface_fn = _build_clustered_surfaces(
mixture_sampler,
profiles,
driver_info_map,
baseline_rate=baseline_rate,
effect_scale=effect_scale,
hte_scale=hte_scale,
)
config, metadata = _build_scenario(
template_name="clustered_complex",
p0_surface=p0_surface,
p1_surface_fn=p1_surface_fn,
metric_id=metric_id,
effect_scale=effect_scale,
severity_sigma=severity_sigma,
n_visitors=n_visitors,
n_nuisance=n_nuisance,
treatment_allocation=treatment_allocation,
seed=seed,
baseline_rate=baseline_rate,
sampler=mixture_sampler,
m1_surface_fn=m1_surface_fn,
driver="session_recency",
)
metadata["n_clusters"] = len(mixture_sampler.weights)
metadata["hte_scale"] = hte_scale
metadata["n_correlated"] = n_correlated
return config, metadata
def _template_monotone_gradient(
metric_id: str,
effect_scale: float,
*,
n_visitors: int = 1000,
n_nuisance: int = 5,
treatment_allocation: float = 0.5,
seed: int = 42,
baseline_rate: float = 0.05,
severity_sigma: float = 0.5,
sampler: CopulaConfig | MixtureConfig | None = None,
revenue_model: str | CartRevenueConfig = "lognormal",
channel_mode: str | None = None,
prognostic_features: list[str] | None = None,
prescriptive_features: list[str] | None = None,
driver: str = "session_recency",
driver_target_level: str | None = None,
severity_driver: str = "browse_depth",
**_kwargs,
) -> tuple[V2GeneratorConfig, dict]:
"""Monotone gradient: CATE increases smoothly with the driver feature.
Continuous: h(X) = 2.0 × (X[driver] - mean) (dose-response curve).
Categorical: h(X) = +1.0 target / 0.0 others (step function).
"""
effective_sampler = sampler if sampler is not None else _build_default_mixture_sampler(n_nuisance)
driver_info = _resolve_driver(driver, effective_sampler)
intercept = float(logit(baseline_rate))
signal_fn, extra_meta = _build_signal(
driver_info, driver, driver_target_level, "monotone_gradient",
cat_target_val=1.0, cat_other_val=0.0,
)
p0, p1_fn, m1_fn = _logit_additive_surfaces(
intercept, signal_fn, conversion_slope=2.0,
)
config, metadata = _build_scenario(
template_name="monotone_gradient",
p0_surface=p0,
p1_surface_fn=p1_fn,
metric_id=metric_id,
effect_scale=effect_scale,
severity_sigma=severity_sigma,
n_visitors=n_visitors,
n_nuisance=n_nuisance,
treatment_allocation=treatment_allocation,
seed=seed,
baseline_rate=baseline_rate,
sampler=sampler,
revenue_model=revenue_model,
m1_surface_fn=m1_fn,
channel_mode=channel_mode,
prognostic_features=prognostic_features,
prescriptive_features=prescriptive_features,
driver=driver,
severity_driver=severity_driver,
)
metadata.update(extra_meta)
return config, metadata
# ---------------------------------------------------------------------------
# TEMPLATES dict (Task 3.2)
# ---------------------------------------------------------------------------
#: Registered scenario templates.
#: Keys are template names; values are callables accepting
#: ``(metric_id: str, effect_scale: float, **kwargs)`` and returning
#: ``(V2GeneratorConfig, dict)``.
TEMPLATES: dict[str, Callable[..., tuple[V2GeneratorConfig, dict]]] = {
"constant": _template_constant,
"reversal": _template_reversal,
"sparse_benefit": _template_sparse_benefit,
"nonlinear": _template_nonlinear,
"monotone_gradient": _template_monotone_gradient,
"interaction_only": _template_interaction_only,
"clustered": _template_clustered,
"clustered_realistic": _template_clustered_realistic,
"clustered_complex": _template_clustered_complex,
}
# ---------------------------------------------------------------------------
# Canonical sweep contract type alias
# ---------------------------------------------------------------------------
#: Tuple of ``(scenario_fn, kwargs)``. Each pair can be invoked as
#: ``scenario_fn(seed=seed, **kwargs)`` to produce a
#: ``(V2GeneratorConfig, dict)`` result. The kwargs dict MUST NOT include
#: ``seed`` — seed is injected by ``run_suite_parallel``.
ScenarioSpec = tuple[Callable[..., tuple[V2GeneratorConfig, dict]], dict[str, Any]]
# ---------------------------------------------------------------------------
# build_catalog (Tasks 18.1, 9.2)
# ---------------------------------------------------------------------------
# Hurdle metric IDs — channel_mode applies only to these.
_HURDLE_METRICS = {"revenue_per_visitor"}
# Marginal sweep relevance mapping:
# key → (reference_templates, reference_metrics, kwarg_name)
# "both" = all metric_ids provided by the caller.
_MARGINAL_SWEEP_REF: dict[str, tuple[str, str]] = {
# (reference_template, reference_metric_set)
# Pairs are (template_name, "hurdle" | "both")
"severity_sigma_grid": ("constant", "hurdle"),
"n_nuisance_grid": ("constant", "both"),
"n_visitors_grid": ("constant", "both"),
"treatment_allocation_grid": ("constant", "both"),
"benefit_prevalence_grid": ("sparse_benefit", "both"),
"baseline_rate_grid": ("constant", "both"),
"revenue_model_grid": ("constant", "hurdle"),
}
# Maps each marginal grid parameter name to the template kwarg name it controls.
_MARGINAL_GRID_TO_KWARG: dict[str, str] = {
"severity_sigma_grid": "severity_sigma",
"n_nuisance_grid": "n_nuisance",
"n_visitors_grid": "n_visitors",
"treatment_allocation_grid": "treatment_allocation",
"benefit_prevalence_grid": "benefit_prevalence",
"baseline_rate_grid": "baseline_rate",
"revenue_model_grid": "revenue_model",
}
[docs]
def build_catalog(
*,
templates: list[str] | None = None,
metric_ids: list[str] | None = None,
effect_scale_grid: list[float],
# Optional marginal sweep grids
severity_sigma_grid: list[float] | None = None,
n_nuisance_grid: list[int] | None = None,
n_visitors_grid: list[int] | None = None,
treatment_allocation_grid: list[float] | None = None,
benefit_prevalence_grid: list[float] | None = None,
baseline_rate_grid: list[float] | None = None,
revenue_model_grid: list[Any] | None = None,
channel_mode_grid: list[str] | None = None,
# Defaults for non-swept dimensions
default_effect_scale: float = 1.0,
default_n_visitors: int = 1000,
default_severity_sigma: float = 0.5,
default_n_nuisance: int = 5,
default_treatment_allocation: float = 0.5,
default_benefit_prevalence: float = 0.15,
default_baseline_rate: float = 0.05,
# Driver coverage
categorical_driver: str | None = None,
) -> list[ScenarioSpec]:
"""Build a star-sweep catalog of ``(scenario_fn, kwargs)`` tuples.
Parameters
----------
templates
Template names to include. Defaults to all 7.
metric_ids
Metric IDs to include. Defaults to both ``"conversion_rate"`` and
``"revenue_per_visitor"``.
effect_scale_grid
Required. Core grid dimension — all templates × metrics are
crossed with every value in this list.
severity_sigma_grid
Optional marginal sweep grid for ``severity_sigma``.
n_nuisance_grid
Optional marginal sweep grid for ``n_nuisance``.
n_visitors_grid
Optional marginal sweep grid for ``n_visitors``.
treatment_allocation_grid
Optional marginal sweep grid for ``treatment_allocation``.
benefit_prevalence_grid
Optional marginal sweep grid for ``benefit_prevalence``.
baseline_rate_grid
Optional marginal sweep grid for ``baseline_rate``.
revenue_model_grid
Optional marginal sweep grid for revenue-model variants.
channel_mode_grid
For hurdle metrics, crosses channel_mode into the core grid.
Defaults to ``["mirror"]`` for backward compatibility.
default_effect_scale
``effect_scale`` used for marginal sweeps (default 1.0).
default_n_visitors
Default visitor count for non-swept dimensions.
default_severity_sigma
Default severity sigma for non-swept dimensions.
default_n_nuisance
Default nuisance-feature count for non-swept dimensions.
default_treatment_allocation
Default treatment allocation for non-swept dimensions.
default_benefit_prevalence
Default benefit prevalence for non-swept dimensions.
default_baseline_rate
Default baseline rate for non-swept dimensions.
categorical_driver
When provided, one additional ``reversal`` spec with this driver is
appended per metric family (binary and hurdle) before the coverage
gate is validated. Must be a categorical or binary feature from
the default schema (e.g. ``"is_returning"``). Named profiles set
this to ``"is_returning"`` so they satisfy the coverage gate by
construction.
Returns
-------
list of ScenarioSpec
List of ``(scenario_fn, kwargs)`` tuples consumable by
``run_suite_parallel``. The kwargs dicts do NOT include ``seed``.
Raises
------
ValueError
If the assembled spec list does not include at least one
continuous-driver and one categorical-driver (binary counts as
categorical) scenario per metric family.
"""
if templates is None:
templates = list(TEMPLATES.keys())
if metric_ids is None:
metric_ids = ["conversion_rate", "revenue_per_visitor"]
_channel_mode_grid = channel_mode_grid if channel_mode_grid is not None else ["mirror"]
# Base kwargs shared by all entries (defaults for non-swept dimensions).
_base_kwargs: dict[str, Any] = {
"n_visitors": default_n_visitors,
"severity_sigma": default_severity_sigma,
"n_nuisance": default_n_nuisance,
"treatment_allocation": default_treatment_allocation,
"benefit_prevalence": default_benefit_prevalence,
"baseline_rate": default_baseline_rate,
}
specs: list[ScenarioSpec] = []
# ------------------------------------------------------------------
# Core grid: template × metric_id × effect_scale
# For hurdle metrics, also cross with channel_mode_grid.
# ------------------------------------------------------------------
for tmpl_name in templates:
tmpl_fn = TEMPLATES[tmpl_name]
for metric_id in metric_ids:
for es in effect_scale_grid:
if metric_id in _HURDLE_METRICS:
# Cross with channel_mode_grid for hurdle.
for cm in _channel_mode_grid:
kwargs: dict[str, Any] = {
**_base_kwargs,
"metric_id": metric_id,
"effect_scale": es,
"channel_mode": cm,
}
specs.append((tmpl_fn, kwargs))
else:
kwargs = {
**_base_kwargs,
"metric_id": metric_id,
"effect_scale": es,
}
specs.append((tmpl_fn, kwargs))
# ------------------------------------------------------------------
# Marginal sweeps: one dimension at a time, against relevant ref.
# ------------------------------------------------------------------
marginal_grids: dict[str, list[Any] | None] = {
"severity_sigma_grid": severity_sigma_grid,
"n_nuisance_grid": n_nuisance_grid,
"n_visitors_grid": n_visitors_grid,
"treatment_allocation_grid": treatment_allocation_grid,
"benefit_prevalence_grid": benefit_prevalence_grid,
"baseline_rate_grid": baseline_rate_grid,
"revenue_model_grid": revenue_model_grid,
}
for grid_name, grid_values in marginal_grids.items():
if grid_values is None:
continue
ref_tmpl_name, ref_metric_scope = _MARGINAL_SWEEP_REF[grid_name]
kwarg_name = _MARGINAL_GRID_TO_KWARG[grid_name]
# Determine which metrics this sweep runs against.
if ref_metric_scope == "hurdle":
ref_metrics = [m for m in metric_ids if m in _HURDLE_METRICS]
else:
ref_metrics = list(metric_ids)
# Skip if the reference template is not in the requested templates.
if ref_tmpl_name not in templates:
continue
ref_tmpl_fn = TEMPLATES[ref_tmpl_name]
for value in grid_values:
# Skip if this value matches the default (already in core grid).
default_val = _base_kwargs.get(kwarg_name)
if default_val is not None and value == default_val:
continue
for metric_id in ref_metrics:
kwargs = {
**_base_kwargs,
"metric_id": metric_id,
"effect_scale": default_effect_scale,
kwarg_name: value,
}
# For hurdle marginal sweeps, include default channel_mode.
if metric_id in _HURDLE_METRICS:
kwargs["channel_mode"] = _channel_mode_grid[0]
specs.append((ref_tmpl_fn, kwargs))
# ------------------------------------------------------------------
# Categorical driver injection (D5): when categorical_driver is
# provided, add one reversal spec per metric family with that driver.
# This ensures profiles satisfy the coverage gate by construction.
# ------------------------------------------------------------------
if categorical_driver is not None:
# Validate the categorical driver against the default sampler.
_cat_sampler = _build_default_mixture_sampler(default_n_nuisance)
cat_info = _resolve_driver(categorical_driver, _cat_sampler)
if cat_info.kind == "continuous":
raise ValueError(
f"build_catalog: categorical_driver={categorical_driver!r} resolved as "
f"continuous. Must be a categorical or binary feature."
)
assert cat_info.levels is not None # non-continuous drivers carry levels
cat_target = cat_info.levels[0]
for metric_id in metric_ids:
cat_kwargs: dict[str, Any] = {
**_base_kwargs,
"metric_id": metric_id,
"effect_scale": default_effect_scale,
"driver": categorical_driver,
"driver_target_level": cat_target,
}
if metric_id in _HURDLE_METRICS:
cat_kwargs["channel_mode"] = _channel_mode_grid[0]
specs.append((_template_reversal, cat_kwargs))
# ------------------------------------------------------------------
# Coverage gate (D5): validate that the assembled specs include at
# least one continuous-driver and one categorical-driver (binary counts
# as categorical) scenario per metric family.
# ------------------------------------------------------------------
_coverage: dict[str, set[str]] = {
"binary": set(), # metric family: metric_id NOT in _HURDLE_METRICS
"hurdle": set(), # metric family: metric_id IN _HURDLE_METRICS
}
_default_sampler = _build_default_mixture_sampler(default_n_nuisance)
for fn, kw in specs:
metric_id = kw.get("metric_id", "")
family = "hurdle" if metric_id in _HURDLE_METRICS else "binary"
# Determine effective driver and kind.
explicit_driver = kw.get("driver")
if explicit_driver is not None:
_drv_info = _resolve_driver(explicit_driver, _default_sampler)
driver_coverage_kind = "categorical" if _drv_info.kind == "categorical" else "continuous"
_coverage[family].add(driver_coverage_kind)
elif fn is _template_constant:
# constant has no driver — excluded from coverage check.
pass
elif fn is _template_clustered:
# clustered uses session_recency internally → continuous.
_coverage[family].add("continuous")
else:
# All other templates default to session_recency (continuous).
_coverage[family].add("continuous")
# Only validate metric families that are actually present in the specs.
_families_present = {
("hurdle" if kw.get("metric_id", "") in _HURDLE_METRICS else "binary")
for _, kw in specs
}
for family in _families_present:
missing = {"continuous", "categorical"} - _coverage[family]
if missing:
raise ValueError(
f"build_catalog: insufficient driver coverage for {family!r} metric family. "
f"Missing driver kinds: {sorted(missing)}. "
"The catalog must include at least one continuous-driver and one "
"categorical-driver (binary counts as categorical) scenario per metric family. "
"Use categorical_driver='is_returning' or include templates with categorical drivers."
)
return specs
# ---------------------------------------------------------------------------
# Named profiles — preset catalog configurations for build_catalog()
# ---------------------------------------------------------------------------
#: Default 3-category cart config for the thorough profile's revenue_model_grid.
_DEFAULT_CART_CONFIG = CartRevenueConfig(
categories=[
ProductCategory(name="budget", base_price=15.0, price_std=3.0, base_purchase_prob=0.6),
ProductCategory(name="mid", base_price=45.0, price_std=8.0, base_purchase_prob=0.3),
ProductCategory(name="premium", base_price=120.0, price_std=20.0, base_purchase_prob=0.1),
]
)
PROFILES: dict[str, dict] = {
# ------------------------------------------------------------------
# validate — minimal smoke test: 1 effect_scale, no marginals, small n.
# Core grid: 7 templates × 2 metrics × 1 es = 14 binary + 14 hurdle
# = 14 binary + 14 hurdle (mirror) = 28 total.
# categorical_driver ensures the coverage gate is satisfied by construction.
# ------------------------------------------------------------------
"validate": {
"effect_scale_grid": [1.0],
"default_n_visitors": 200,
"categorical_driver": "is_returning",
},
# ------------------------------------------------------------------
# standard — routine development calibration, ~50 configs.
# Core: 7 × 2 × 3 es = 42 (binary) + 42 (hurdle) = 84 core.
# Marginals add ~14–20 more via severity_sigma, n_nuisance, n_visitors,
# and baseline_rate_grid sweeps.
# categorical_driver ensures the coverage gate is satisfied by construction.
# ------------------------------------------------------------------
"standard": {
"effect_scale_grid": [0.5, 1.0, 2.0],
"severity_sigma_grid": [0.3, 1.0],
"n_nuisance_grid": [0, 10],
"n_visitors_grid": [500, 2000],
"baseline_rate_grid": [0.02, 0.05, 0.15],
"default_n_visitors": 1000,
"categorical_driver": "is_returning",
},
# ------------------------------------------------------------------
# thorough — pre-release dense coverage, ~150 configs.
# Denser effect_scale grid + all marginal dimensions including
# channel_mode_grid, treatment_allocation, benefit_prevalence,
# and revenue_model_grid.
# categorical_driver ensures the coverage gate is satisfied by construction.
# ------------------------------------------------------------------
"thorough": {
"effect_scale_grid": [0.25, 0.5, 1.0, 2.0, 4.0],
"severity_sigma_grid": [0.2, 0.5, 1.0, 2.0],
"n_nuisance_grid": [0, 5, 15],
"n_visitors_grid": [250, 500, 1000, 2000, 5000],
"treatment_allocation_grid": [0.3, 0.5, 0.7],
"benefit_prevalence_grid": [0.05, 0.15, 0.30],
"baseline_rate_grid": [0.02, 0.05, 0.10, 0.20],
"revenue_model_grid": ["lognormal", _DEFAULT_CART_CONFIG],
"channel_mode_grid": ["mirror", "opposing", "conversion_only", "severity_only"],
"default_n_visitors": 1000,
"categorical_driver": "is_returning",
},
}