Source code for pytyche.generators.scenarios

"""V2 stress scenario catalog for BCF development.

Public API
----------
Template-based API:

- ``TEMPLATES``: ``dict[str, Callable]`` — registered scenario templates.
  Keys: ``"constant"``, ``"reversal"``, ``"sparse_benefit"``,
  ``"nonlinear"``, ``"monotone_gradient"``, ``"interaction_only"``,
  ``"clustered"``. Each template callable has signature
  ``(metric_id, effect_scale, ...) -> (V2GeneratorConfig, dict)``.
- ``build_catalog(...)`` returns ``list[ScenarioSpec]`` — a star-sweep
  catalog of ``(scenario_fn, kwargs)`` tuples for ``run_suite_parallel``.
- ``PROFILES``: ``dict[str, dict]`` — preset catalog configurations.
  Keys: ``"validate"``, ``"standard"``, ``"thorough"``.

Design
------
- Template callables accept ``seed`` and other knobs as kwargs so that
  ``(template_fn, kwargs)`` pairs are fully reproducible.
- Metadata dicts carry at minimum ``"scenario_class"`` plus all
  control-knob keys so downstream benchmarking loops can group, filter,
  and compare runs.
- Configs are ready to pass directly to ``generate_v2_core()``.
- No imports from v1 generator internals.
"""

from __future__ import annotations

import dataclasses
from collections.abc import Callable
from typing import Any, cast

import numpy as np
import pandas as pd
from scipy.special import expit, logit

from pytyche.contracts import CartRevenueConfig, ProductCategory
from pytyche.generators.core import (
    AssignmentConfig,
    CopulaConfig,
    FeatureSpec,
    MetricMode,
    MixtureConfig,
    SurfaceConfig,
    V2GeneratorConfig,
    sigmoid_surface,
    threshold_surface,
)

# Re-export for backward compatibility — callers that import these from
# generate_scenarios continue to work after the move to contracts.py.
__all__ = [
    "CartRevenueConfig",
    "ProductCategory",
]


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------


def _identity_corr(n: int) -> np.ndarray:
    """Return an n×n identity correlation matrix (no inter-feature correlation)."""
    return np.eye(n)


def _relative_lift_to_logit_shift(conversion_delta: float, baseline_rate: float) -> float:
    """Convert relative lift to logit-space shift.

    ``conversion_delta=0.15`` means "+15% relative lift."
    At ``baseline_rate=0.03``: ``p1 = 0.03 × 1.15 = 0.0345``.
    The returned shift satisfies ``expit(logit(baseline_rate) + shift) == p1``.

    Parameters
    ----------
    conversion_delta:
        Relative lift (e.g. +0.15 = +15%, -0.20 = -20%).
    baseline_rate:
        Baseline conversion probability at the population centroid.

    Returns
    -------
    float
        Logit-space shift to apply to the baseline logit.
    """
    p0 = baseline_rate
    p1 = p0 * (1.0 + conversion_delta)
    # Clamp p1 to a valid probability range to avoid logit(0) or logit(1).
    p1 = max(min(p1, 0.9999), 0.0001)
    return float(logit(p1) - logit(p0))


def _severity_multiplicative_factor(severity_delta: float) -> float:
    """Convert a relative severity lift to a multiplicative factor.

    ``severity_delta=0.10`` means "+10% AOV lift": ``m1 = m0 × 1.10``.
    The returned factor is ``1 + severity_delta``.

    Parameters
    ----------
    severity_delta:
        Relative lift (e.g. +0.10 = +10%, -0.15 = -15%).

    Returns
    -------
    float
        Multiplicative factor for the severity mean.
    Raises
    ------
    ValueError
        If ``severity_delta <= -1.0`` (would produce a non-positive factor).
    """
    if severity_delta <= -1.0:
        raise ValueError(
            f"severity_delta must be > -1.0 to keep factor positive, got {severity_delta}"
        )
    return 1.0 + severity_delta


# ---------------------------------------------------------------------------
# Default mixed-type feature schema (Task 3.1)
# ---------------------------------------------------------------------------

#: Default core features for non-clustered templates.
#: Semantic e-commerce names, mixed types: 2 continuous + 2 categorical + 1 binary.
_DEFAULT_CORE_FEATURES: tuple[FeatureSpec, ...] = (
    FeatureSpec(name="session_recency", kind="continuous"),
    FeatureSpec(name="browse_depth", kind="continuous"),
    FeatureSpec(name="device_type", kind="categorical"),
    FeatureSpec(name="is_returning", kind="binary"),
    FeatureSpec(name="channel", kind="categorical"),
)


def _build_default_mixture_sampler(n_nuisance: int = 5) -> MixtureConfig:
    """Build the default single-cluster MixtureConfig with realistic e-commerce marginals.

    Schema:
        session_recency — continuous, mean=2.0 std=1.5 (right-skewed recency proxy)
        browse_depth    — continuous, mean=3.0 std=2.0 (pages viewed per session)
        device_type     — categorical, mobile 55% / desktop 35% / tablet 10%
        is_returning    — binary, 35% True (65% new visitors)
        channel         — categorical, organic 40% / paid 30% / social 20% / email 10%
        z0..z{n-1}      — continuous nuisance features, N(0, 1)
    """
    nuisance_features: tuple[FeatureSpec, ...] = tuple(
        FeatureSpec(name=f"z{i}", kind="continuous") for i in range(n_nuisance)
    )
    all_features = _DEFAULT_CORE_FEATURES + nuisance_features

    cluster_params: dict[str, Any] = {
        "session_recency_mean": 2.0,
        "session_recency_std": 1.5,
        "browse_depth_mean": 3.0,
        "browse_depth_std": 2.0,
        "device_type_probs": {"mobile": 0.55, "desktop": 0.35, "tablet": 0.10},
        "is_returning_prob": 0.35,
        "channel_probs": {"organic": 0.40, "paid": 0.30, "social": 0.20, "email": 0.10},
    }
    for i in range(n_nuisance):
        cluster_params[f"z{i}_mean"] = 0.0
        cluster_params[f"z{i}_std"] = 1.0

    return MixtureConfig(
        features=all_features,
        weights=(1.0,),
        cluster_params=(cluster_params,),
    )


def _const_surface(value: float) -> SurfaceConfig:
    """Surface that returns the same scalar for every visitor."""
    return SurfaceConfig(fn=lambda X, v=value: np.full(len(X), v))


[docs] @dataclasses.dataclass(frozen=True) class DriverInfo: """Resolved driver feature statistics from a sampler config. Two kinds: ``"continuous"`` or ``"categorical"``. Binary features (FeatureSpec kind="binary") map to categorical with ``levels=(True, False)`` or ``(False, True)`` depending on frequency. Fields: name: Feature column name. kind: ``"continuous"`` or ``"categorical"`` — no separate binary kind. mean: Population-weighted mean (continuous only, 0.0 for categorical). std: Population marginal std (continuous only, 1.0 for categorical). levels: Actual level values sorted by ``(-prob, str(level))`` for deterministic tie-breaking. ``None`` for continuous. """ name: str kind: str # "continuous" or "categorical" mean: float = 0.0 std: float = 1.0 levels: tuple[Any, ...] | None = None def __post_init__(self) -> None: if self.kind not in ("continuous", "categorical"): raise ValueError( f"DriverInfo: kind must be 'continuous' or 'categorical', got {self.kind!r}" ) if self.kind == "categorical" and self.levels is None: raise ValueError("DriverInfo: categorical driver must have levels") if self.kind == "continuous" and self.levels is not None: raise ValueError("DriverInfo: continuous driver must not have levels")
_VALID_SIGNAL_TYPES: frozenset[str] = frozenset({"linear", "interaction", "quadratic", "step"})
[docs] @dataclasses.dataclass(frozen=True) class SignalTerm: """One additive term in a within-cluster CATE surface. Fields: signal_type: One of ``"linear"``, ``"interaction"``, ``"quadratic"``, ``"step"``. drivers: Feature name(s) for this term. Count is type-dependent. amplitude: Signed weight for this term. level_values: Per-level signal values for ``"step"`` type; ``None`` for others. """ signal_type: str drivers: tuple[str, ...] amplitude: float level_values: dict[str, float] | None = None def __post_init__(self) -> None: if self.signal_type not in _VALID_SIGNAL_TYPES: raise ValueError( f"SignalTerm: signal_type must be one of {sorted(_VALID_SIGNAL_TYPES)}, " f"got {self.signal_type!r}" ) if self.signal_type == "linear": if len(self.drivers) != 1: raise ValueError( f"SignalTerm: 'linear' requires exactly 1 driver, " f"got {len(self.drivers)}" ) elif self.signal_type == "interaction": if len(self.drivers) != 2: raise ValueError( f"SignalTerm: 'interaction' requires exactly 2 drivers, " f"got {len(self.drivers)}" ) elif self.signal_type == "quadratic": if len(self.drivers) != 1: raise ValueError( f"SignalTerm: 'quadratic' requires exactly 1 driver, " f"got {len(self.drivers)}" ) elif self.signal_type == "step": if len(self.drivers) != 1: raise ValueError( f"SignalTerm: 'step' requires exactly 1 driver, " f"got {len(self.drivers)}" ) if not self.level_values: raise ValueError( "SignalTerm: 'step' requires level_values to be provided and non-empty" )
[docs] @dataclasses.dataclass(frozen=True) class ClusterEffectProfile: """Treatment effect + prognostic specification for one cluster. The latent cluster type drives both prognostic outcomes (p0, m0) and treatment response (p1-p0, m1-m0). Observed features are noisy windows into the latent type — BCF only sees features, not cluster_id. Fields: conversion_delta: Relative lift for conversion probability (e.g. +0.15 = +15%). severity_delta: Relative lift for severity mean (e.g. -0.10 = -10%). conversion_signals: Additive within-cluster signal terms for conversion HTE. severity_signals: Additive within-cluster signal terms for severity HTE. prognostic_conv_base: Logit-space shift from global baseline_rate for this cluster's p0. Centered across clusters so weighted mean = 0. prognostic_sev_base: Log-space shift from global m0_base for this cluster's m0. Centered across clusters so weighted mean = 0. prognostic_conv_signals: Within-cluster feature-driven p0 gradients. prognostic_sev_signals: Within-cluster feature-driven m0 gradients. """ conversion_delta: float = 0.0 severity_delta: float = 0.0 conversion_signals: tuple[SignalTerm, ...] = () severity_signals: tuple[SignalTerm, ...] = () prognostic_conv_base: float = 0.0 prognostic_sev_base: float = 0.0 prognostic_conv_signals: tuple[SignalTerm, ...] = () prognostic_sev_signals: tuple[SignalTerm, ...] = ()
def _resolve_driver( driver: str, sampler: CopulaConfig | MixtureConfig, ) -> DriverInfo: """Resolve driver feature statistics from a sampler config. Extracts population-level statistics (mean, std, or level probabilities) from the sampler so templates can center surfaces and select target levels without hardcoding feature-specific values. Parameters ---------- driver: Feature name to resolve. sampler: Active sampler config (``MixtureConfig`` or ``CopulaConfig``). Returns ------- DriverInfo Resolved statistics for the driver feature. Raises ------ ValueError If the driver is not found in the sampler's feature list. """ # Find feature spec in sampler. feat_spec: FeatureSpec | None = None for spec in sampler.features: if spec.name == driver: feat_spec = spec break if feat_spec is None: raise ValueError( f"Driver feature {driver!r} not found in sampler features. " f"Known features: {[s.name for s in sampler.features]}" ) if isinstance(sampler, MixtureConfig): return _resolve_driver_mixture(driver, feat_spec, sampler) else: return _resolve_driver_copula(driver, feat_spec) def _resolve_driver_mixture( driver: str, feat_spec: FeatureSpec, sampler: MixtureConfig, ) -> DriverInfo: """Resolve driver from MixtureConfig using weighted population stats.""" total_weight = sum(sampler.weights) w_norm = [w / total_weight for w in sampler.weights] if feat_spec.kind == "continuous": pop_mean = sum( w * params[f"{driver}_mean"] for w, params in zip(w_norm, sampler.cluster_params, strict=False) ) pop_var = ( sum( w * (params[f"{driver}_std"] ** 2 + params[f"{driver}_mean"] ** 2) for w, params in zip(w_norm, sampler.cluster_params, strict=False) ) - pop_mean**2 ) pop_std = float(np.sqrt(max(pop_var, 0.0))) return DriverInfo(name=driver, kind="continuous", mean=pop_mean, std=pop_std) elif feat_spec.kind == "binary": # Binary → categorical with bool levels. prob_true = sum( w * params[f"{driver}_prob"] for w, params in zip(w_norm, sampler.cluster_params, strict=False) ) prob_false = 1.0 - prob_true # Sort by (-prob, str(level)) for deterministic ordering. level_probs = [(True, prob_true), (False, prob_false)] sorted_levels = sorted(level_probs, key=lambda lp: (-lp[1], str(lp[0]))) levels = tuple(lp[0] for lp in sorted_levels) return DriverInfo(name=driver, kind="categorical", levels=levels) else: # Categorical: aggregate weighted probabilities across clusters. combined_probs: dict[str, float] = {} for w, params in zip(w_norm, sampler.cluster_params, strict=False): probs_dict = params.get(f"{driver}_probs", {}) for level, p in probs_dict.items(): combined_probs[level] = combined_probs.get(level, 0.0) + w * p sorted_levels_cat = sorted( combined_probs.keys(), key=lambda lvl: (-combined_probs[lvl], str(lvl)) ) levels = tuple(sorted_levels_cat) return DriverInfo(name=driver, kind="categorical", levels=levels) def _resolve_driver_copula( driver: str, feat_spec: FeatureSpec, ) -> DriverInfo: """Resolve driver from CopulaConfig using known defaults.""" if feat_spec.kind == "continuous": return DriverInfo(name=driver, kind="continuous", mean=0.0, std=1.0) elif feat_spec.kind == "binary": # Default binary: 0.5 each → tie-broken by str: "False" < "True" return DriverInfo(name=driver, kind="categorical", levels=(False, True)) else: raise ValueError( f"CopulaConfig does not provide category level information for " f"categorical feature {driver!r}. Use MixtureConfig for " f"categorical driver resolution." ) # --------------------------------------------------------------------------- # Shared builder (Task 3.1) # --------------------------------------------------------------------------- def _build_scenario( *, template_name: str, p0_surface: SurfaceConfig, p1_surface_fn: Callable[[float], SurfaceConfig], metric_id: str, effect_scale: float, severity_sigma: float = 0.5, n_visitors: int = 1000, n_nuisance: int = 5, treatment_allocation: float = 0.5, seed: int = 42, baseline_rate: float = 0.05, sampler: CopulaConfig | MixtureConfig | None = None, revenue_model: str | CartRevenueConfig = "lognormal", m0_surface: SurfaceConfig | None = None, m1_surface_fn: Callable[[float], SurfaceConfig] | None = None, channel_mode: str | None = None, prognostic_features: list[str] | None = None, prescriptive_features: list[str] | None = None, driver: str | None = None, severity_driver: str = "browse_depth", ) -> tuple[V2GeneratorConfig, dict]: """Shared wiring for all template-based scenario construction. Handles feature list construction, sampler setup, MetricMode assembly, hurdle severity mirroring, config assembly, and metadata emission. Parameters ---------- template_name: Used in metadata as ``scenario_class`` and ``template``. p0_surface: Baseline conversion surface (control arm probability surface). p1_surface_fn: Callable of ``effect_scale`` returning the treatment conversion surface. Must produce p1 == p0 when effect_scale == 0. metric_id: ``"conversion_rate"`` or ``"revenue_per_visitor"``. effect_scale: Controls CATE magnitude. 0 = A/A (p1 = p0). severity_sigma: LogNormal sigma for hurdle severity surfaces. n_visitors: Total visitor count. n_nuisance: Number of additional uncorrelated nuisance features (z0..z{n-1}). treatment_allocation: Fraction assigned to treatment. seed: Random seed. baseline_rate: Baseline conversion probability. Sets logit-additive intercept for new shapes; for existing shapes with their own baseline structure, this is recorded in metadata but shapes may use their own constants. sampler: Override the default MixtureConfig sampler. When ``MixtureConfig`` is provided, ``cluster_id`` is available to surfaces during generation but stripped from output by ``generate_v2_core`` (latent, not observed). revenue_model: Revenue model type. ``"lognormal"`` (default) or a CartRevenueConfig. m1_surface_fn: Optional callable of ``effect_scale`` returning the treatment severity mean surface for hurdle metrics. When provided, overrides the default constant m1 surface (``m0_base + effect_scale * 5``). When ``None``, falls back to the constant severity offset. Used by templates that want severity surfaces to mirror the conversion spatial pattern. channel_mode: Controls severity surface construction for hurdle metrics. Binary metrics ignore this parameter. - ``None`` / ``"mirror"`` (default): existing behavior — use ``m1_surface_fn`` if provided, else constant offset. - ``"opposing"``: negate the severity delta. If ``m1_surface_fn`` is provided, wrap its output to flip the sign of the delta around ``m0_base``; otherwise ``m1 = m0_base - effect_scale * 5``. - ``"conversion_only"``: ``m0 = m1`` (identical severity surfaces, zero AOV component). - ``"severity_only"``: ``p0 = p1`` (identical conversion surfaces, zero conversion component). Severity varies per the normal ``m1_surface_fn`` / constant offset logic. - ``"orthogonal"``: conversion uses the template's driver, severity uses ``severity_driver`` (independent features). The template's ``p1_surface_fn`` is preserved; the severity surface becomes ``m1 = m0_base + effect_scale * severity_slope * (severity_driver - mean)``. prognostic_features: Optional list of feature names that drive only the baseline surfaces (``p0``, ``m0``). When specified, baseline surfaces receive only these columns from the feature matrix X. When ``None``, all features are available to baseline surfaces (backward compatible). prescriptive_features: Optional list of feature names that drive only the treatment delta surfaces (``p1 - p0``, ``m1 - m0``). When specified, treatment surfaces receive only these columns. When ``None``, all features are available (backward compatible). driver: Name of the primary feature driving CATE heterogeneity. When provided, recorded in metadata as ``"driver"``. When ``None``, the key is absent from metadata. severity_driver: Feature name for the orthogonal severity channel (default ``"browse_depth"``). Only used when ``channel_mode="orthogonal"``. Returns ------- tuple[V2GeneratorConfig, dict] Config ready for ``generate_v2_core()`` and machine-readable metadata. """ # 1. Sampler setup: use provided sampler or build default MixtureConfig. # Default is a single-cluster MixtureConfig with semantic mixed-type e-commerce features. # CopulaConfig is still available via the sampler override. if sampler is None: feature_sampler: CopulaConfig | MixtureConfig = _build_default_mixture_sampler(n_nuisance) else: feature_sampler = sampler # 2b. Feature separation: wrap surfaces to zero out non-designated columns in X. # When prognostic_features is specified, p0 (baseline) receives X with all other # continuous feature columns zeroed — removing their variation. # When prescriptive_features is specified, p1 surface receives X with all other # continuous feature columns zeroed. # When neither is specified, all features are shared (backward compatible). # Zeroing (not projection) is used so surface functions that read any column # still work — they just receive 0.0 for non-designated features. def _zero_non_designated(X: pd.DataFrame, keep_cols: list[str]) -> pd.DataFrame: """Return a copy of X with continuous (float) feature columns NOT in keep_cols set to 0.""" X_masked = X.copy() for col in X_masked.columns: if col not in keep_cols and X_masked[col].dtype == float: X_masked[col] = 0.0 return X_masked if prognostic_features is not None: _prog_cols = list(prognostic_features) _orig_p0_fn = p0_surface.fn p0_surface = SurfaceConfig( fn=lambda X, _fn=_orig_p0_fn, _cols=_prog_cols: _fn(_zero_non_designated(X, _cols)) ) if prescriptive_features is not None: _presc_cols = list(prescriptive_features) _orig_p1_fn_builder = p1_surface_fn def _wrapped_p1_surface_fn(es: float, _builder=_orig_p1_fn_builder, _cols=_presc_cols) -> SurfaceConfig: _inner = _builder(es) return SurfaceConfig(fn=lambda X, _fn=_inner.fn, _c=_cols: _fn(_zero_non_designated(X, _c))) p1_surface_fn = _wrapped_p1_surface_fn # Also wrap m1_surface_fn (severity delta) when provided, so hurdle # scenarios respect prescriptive feature separation on both channels. if m1_surface_fn is not None: _orig_m1_fn_builder = m1_surface_fn def _wrapped_m1_surface_fn(es: float, _builder=_orig_m1_fn_builder, _cols=_presc_cols) -> SurfaceConfig: _inner = _builder(es) return SurfaceConfig(fn=lambda X, _fn=_inner.fn, _c=_cols: _fn(_zero_non_designated(X, _c))) m1_surface_fn = _wrapped_m1_surface_fn # 3. Reject incompatible knobs early. if channel_mode is not None and metric_id == "conversion_rate": raise ValueError( f"channel_mode={channel_mode!r} is not supported for binary metric " f"{metric_id!r}; channel_mode applies only to hurdle metrics" ) # 4. Build treatment surface from the callable p1_surface = p1_surface_fn(effect_scale) # 5. MetricMode assembly if metric_id == "conversion_rate": metric_mode = MetricMode( metric_id="conversion_rate", p0_surface=p0_surface, p1_surface=p1_surface, ) elif metric_id == "revenue_per_visitor": # Severity surfaces: constant m0 at 50.0. # channel_mode controls how m0/m1/p0/p1 are assembled. _m0_base = 50.0 _severity_slope = 5.0 sigma0_surface = _const_surface(severity_sigma) sigma1_surface = _const_surface(severity_sigma) _resolved_mode = channel_mode if channel_mode is not None else "mirror" if _resolved_mode == "mirror": # Default: use m1_surface_fn if provided, else constant offset. if m0_surface is None: m0_surface = _const_surface(_m0_base) if m1_surface_fn is not None: m1_surface = m1_surface_fn(effect_scale) else: m1_surface = _const_surface(_m0_base + effect_scale * _severity_slope) effective_p0 = p0_surface effective_p1 = p1_surface elif _resolved_mode == "opposing": # Negate severity delta: m1 reflects opposite direction to m1_surface_fn. m0_surface = _const_surface(_m0_base) if m1_surface_fn is not None: # Wrap m1_surface_fn output to negate delta around m0_base: # m1_negated = m0_base - (m1_surface_fn(es)(X) - m0_base) # = 2 * m0_base - m1_surface_fn(es)(X) _raw_m1 = m1_surface_fn(effect_scale) m1_surface = SurfaceConfig( fn=lambda X, _m=_raw_m1, _b=_m0_base: 2.0 * _b - _m.fn(X) ) else: m1_surface = _const_surface(_m0_base - effect_scale * _severity_slope) effective_p0 = p0_surface effective_p1 = p1_surface elif _resolved_mode == "conversion_only": # m0 == m1 (identical severity surfaces — zero AOV component). m0_surface = _const_surface(_m0_base) m1_surface = _const_surface(_m0_base) effective_p0 = p0_surface effective_p1 = p1_surface elif _resolved_mode == "severity_only": # p0 == p1 (identical conversion surfaces — zero conversion component). # Severity varies per the normal m1_surface_fn / constant offset logic. m0_surface = _const_surface(_m0_base) if m1_surface_fn is not None: m1_surface = m1_surface_fn(effect_scale) else: m1_surface = _const_surface(_m0_base + effect_scale * _severity_slope) # Override p1 to equal p0 (no conversion effect). effective_p0 = p0_surface effective_p1 = p0_surface elif _resolved_mode == "orthogonal": # Conversion uses the template's driver (via p1_surface_fn); severity uses # severity_driver (independent feature). Center around population mean so # the severity CATE flips sign around 0. _sev_info = _resolve_driver(severity_driver, feature_sampler) _sev_mu = _sev_info.mean m0_surface = _const_surface(_m0_base) m1_surface = SurfaceConfig( fn=lambda X, _es=effect_scale, _sl=_severity_slope, _b=_m0_base, _mu=_sev_mu, _sd=severity_driver: ( _b + _es * _sl * (X[_sd].values.astype(float) - _mu) ) ) effective_p0 = p0_surface effective_p1 = p1_surface else: raise ValueError( f"_build_scenario: unsupported channel_mode {_resolved_mode!r}. " "Must be one of 'mirror', 'opposing', 'conversion_only', " "'severity_only', 'orthogonal', or None." ) metric_mode = MetricMode( metric_id="revenue_per_visitor", p0_surface=effective_p0, p1_surface=effective_p1, m0_surface=m0_surface, m1_surface=m1_surface, sigma0_surface=sigma0_surface, sigma1_surface=sigma1_surface, ) else: raise ValueError( f"_build_scenario: unsupported metric_id {metric_id!r}. " "Must be 'conversion_rate' or 'revenue_per_visitor'." ) # 5. Config assembly # Pass CartRevenueConfig to V2GeneratorConfig.revenue_model so generate_core # can dispatch to the cart sampler instead of lognormal. cart_config: CartRevenueConfig | None = ( revenue_model if isinstance(revenue_model, CartRevenueConfig) else None ) config = V2GeneratorConfig( n_visitors=n_visitors, feature_sampler=feature_sampler, metric_mode=metric_mode, assignment=AssignmentConfig(treatment_allocation=treatment_allocation), seed=seed, experiment_id=f"{template_name}-es{effect_scale}-seed{seed}", revenue_model=cart_config, ) # 6. Metadata metadata: dict[str, str | int | float] = { "scenario_class": template_name, "template": template_name, "metric_id": metric_id, "effect_scale": effect_scale, "n_visitors": n_visitors, "seed": seed, "treatment_allocation": treatment_allocation, "baseline_rate": baseline_rate, "n_nuisance": n_nuisance, } # Record revenue_model in metadata: "lognormal" for default, "cart" for CartRevenueConfig. if isinstance(revenue_model, CartRevenueConfig): metadata["revenue_model"] = "cart" else: metadata["revenue_model"] = revenue_model # Record driver in metadata when provided. if driver is not None: metadata["driver"] = driver return config, metadata # --------------------------------------------------------------------------- # Shared template helpers # --------------------------------------------------------------------------- def _resolve_categorical_target( driver_info: DriverInfo, driver_target_level: Any | None, template_name: str, ) -> tuple[Any, str]: """Resolve target level for a categorical driver. Returns ``(target_value, "auto"|"explicit")``. Auto-selects the most frequent level when ``driver_target_level`` is ``None``. """ assert driver_info.levels is not None, ( f"_resolve_categorical_target called for non-categorical driver " f"{driver_info.name!r}" ) if driver_target_level is None: return driver_info.levels[0], "auto" if driver_target_level not in driver_info.levels: raise ValueError( f"_template_{template_name}: driver_target_level={driver_target_level!r} " f"not in driver {driver_info.name!r} levels: {driver_info.levels}" ) return driver_target_level, "explicit" def _build_signal( driver_info: DriverInfo, driver: str, driver_target_level: Any | None, template_name: str, *, cat_target_val: float, cat_other_val: float, ) -> tuple[Callable, dict]: """Build a zero-centered spatial signal from a driver feature. Continuous drivers produce ``lambda X: X[driver] - mu``. Categorical drivers produce a contrast function using ``cat_target_val`` for the target level and ``cat_other_val`` for all others. Returns ``(signal_fn, extra_metadata_dict)``. """ if driver_info.kind == "continuous": _mu = driver_info.mean def signal_fn_continuous(X, _d=driver, _m=_mu): return np.asarray(X[_d], dtype=float) - _m return signal_fn_continuous, {} else: _t, _src = _resolve_categorical_target(driver_info, driver_target_level, template_name) def signal_fn_categorical(X, _d=driver, _tgt=_t, _tv=cat_target_val, _ov=cat_other_val): return np.where(np.asarray(X[_d]) == _tgt, _tv, _ov).astype(float) return signal_fn_categorical, {"driver_target_level": _t, "driver_target_source": _src} def _evaluate_signal( term: SignalTerm, X: pd.DataFrame, driver_info: dict[str, DriverInfo], ) -> np.ndarray: """Evaluate a SignalTerm against feature matrix X using driver_info statistics. Dispatches on ``term.signal_type``: - ``"linear"``: ``_build_signal`` encoding (centered continuous or contrast-coded categorical) multiplied by ``term.amplitude``. - ``"interaction"``: product of two ``_build_signal`` encodings, multiplied by ``term.amplitude``. - ``"quadratic"``: squared centered continuous value, multiplied by ``term.amplitude``. Raises ``ValueError`` for categorical drivers. - ``"step"``: per-level values from ``term.level_values`` mapped onto ``X[driver]``; unknown levels receive 0.0. Multiplied by ``term.amplitude``. Parameters ---------- term: SignalTerm describing signal type, drivers, amplitude, and (for step) level_values. X: Feature DataFrame with one row per visitor. driver_info: Mapping from feature name to resolved DriverInfo statistics. Returns ------- np.ndarray 1-D array of shape ``(len(X),)`` with the evaluated signal values. Raises ------ ValueError If ``signal_type="quadratic"`` is used with a categorical driver. """ if term.signal_type == "linear": driver = term.drivers[0] signal_fn, _ = _build_signal( driver_info[driver], driver, None, "_evaluate_signal", cat_target_val=0.5, cat_other_val=-0.5, ) return term.amplitude * signal_fn(X) elif term.signal_type == "interaction": driver0 = term.drivers[0] driver1 = term.drivers[1] signal_fn0, _ = _build_signal( driver_info[driver0], driver0, None, "_evaluate_signal", cat_target_val=0.5, cat_other_val=-0.5, ) signal_fn1, _ = _build_signal( driver_info[driver1], driver1, None, "_evaluate_signal", cat_target_val=0.5, cat_other_val=-0.5, ) return term.amplitude * (signal_fn0(X) * signal_fn1(X)) elif term.signal_type == "quadratic": driver = term.drivers[0] info = driver_info[driver] if info.kind != "continuous": raise ValueError( f"_evaluate_signal: signal_type='quadratic' requires a continuous driver, " f"but driver={driver!r} has kind={info.kind!r}. " f"Use a continuous feature for quadratic terms." ) centered = np.asarray(X[driver], dtype=float) - info.mean return term.amplitude * (centered ** 2) elif term.signal_type == "step": driver = term.drivers[0] col = np.asarray(X[driver]) values = np.zeros(len(X), dtype=float) for level, val in (term.level_values or {}).items(): values[col == level] = val return term.amplitude * values else: raise ValueError( f"_evaluate_signal: unhandled signal_type={term.signal_type!r}" ) def _logit_additive_surfaces( intercept: float, raw_signal_fn: Callable, *, conversion_slope: float = 1.0, severity_slope: float = 5.0, m0_base: float = 50.0, ) -> tuple[SurfaceConfig, Callable[[float], SurfaceConfig], Callable[[float], SurfaceConfig]]: """Build the logit-additive (p0, p1_fn, m1_fn) triple from a raw spatial signal. Returns: p0_surface: sigmoid(intercept) — constant baseline. p1_surface_fn: es → sigmoid(intercept + es × conversion_slope × signal(X)) m1_surface_fn: es → m0_base + es × severity_slope × signal(X) """ p0_surface = sigmoid_surface( SurfaceConfig(fn=lambda X, _ic=intercept: np.full(len(X), _ic)) ) def p1_surface_fn(es: float) -> SurfaceConfig: return sigmoid_surface( SurfaceConfig( fn=lambda X, _es=es, _ic=intercept, _sl=conversion_slope, _sig=raw_signal_fn: ( _ic + _es * _sl * _sig(X) ) ) ) def m1_surface_fn(es: float) -> SurfaceConfig: return SurfaceConfig( fn=lambda X, _es=es, _ssl=severity_slope, _b=m0_base, _sig=raw_signal_fn: ( _b + _es * _ssl * _sig(X) ) ) return p0_surface, p1_surface_fn, m1_surface_fn def _require_continuous( driver_info: DriverInfo, param_label: str, template_name: str, ) -> None: """Raise ValueError if a driver is not continuous.""" if driver_info.kind != "continuous": raise ValueError( f"{template_name} template requires continuous drivers, but " f"{param_label}={driver_info.name!r} has kind={driver_info.kind!r}. " f"Use a continuous feature (e.g. 'session_recency', 'browse_depth') for {template_name}." ) def _make_correlated_nuisance( n_clusters: int, n_correlated: int, *, spread: float = 1.5, rng_seed: int = 98765, ) -> tuple[tuple[FeatureSpec, ...], tuple[dict[str, Any], ...]]: """Generate cluster-correlated nuisance features. These features have cluster-varying distributions (different means for continuous, different probability vectors for categorical) so they correlate with latent cluster membership. They are NOT added to any ``SignalTerm`` — they exist purely as spurious correlates of the latent type, creating a realistic "haystack" for the estimator. Parameters ---------- n_clusters: Number of clusters in the mixture. n_correlated: Total number of correlated nuisance features to generate. Split ~60% continuous (``xc0``, ``xc1``, ...) and ~40% categorical (``xk0``, ``xk1``, ...). spread: Controls how separated cluster means are for continuous features. Higher = more separable (easier); lower = noisier projection. rng_seed: Seed for deterministic generation of per-cluster parameters. Returns ------- (feature_specs, per_cluster_param_dicts) ``feature_specs``: tuple of FeatureSpec for the new features. ``per_cluster_param_dicts``: tuple of dicts (one per cluster) with the sampling parameters to merge into cluster_params. """ if n_correlated <= 0: empty_params = tuple({} for _ in range(n_clusters)) return (), empty_params rng = np.random.default_rng(rng_seed) n_cont = round(n_correlated * 0.6) n_cat = n_correlated - n_cont feature_specs: list[FeatureSpec] = [] # Initialize one param dict per cluster per_cluster: list[dict[str, Any]] = [{} for _ in range(n_clusters)] # --- Continuous features: cluster-specific means, shared std=1.0 --- for i in range(n_cont): name = f"xc{i}" feature_specs.append(FeatureSpec(name=name, kind="continuous")) # Draw cluster means from N(0, spread) — seeded so deterministic cluster_means = rng.normal(0.0, spread, size=n_clusters) for c in range(n_clusters): per_cluster[c][f"{name}_mean"] = float(cluster_means[c]) per_cluster[c][f"{name}_std"] = 1.0 # --- Categorical features: cluster-specific probability vectors --- for i in range(n_cat): name = f"xk{i}" n_levels = int(rng.integers(3, 6)) # 3–5 levels levels = [f"{name}_L{j}" for j in range(n_levels)] feature_specs.append(FeatureSpec(name=name, kind="categorical")) # Draw cluster-specific probability vectors from Dirichlet # Concentration < 1 makes distributions peaky (different dominant levels) alpha = np.full(n_levels, 0.5) for c in range(n_clusters): probs = rng.dirichlet(alpha) per_cluster[c][f"{name}_probs"] = { lev: float(p) for lev, p in zip(levels, probs, strict=False) } return tuple(feature_specs), tuple(per_cluster) def _build_clustered_sampler( n_nuisance: int, n_correlated: int = 0, ) -> MixtureConfig: """Build the 2-cluster MixtureConfig for the clustered template. Cluster 0 (60%): high recency, mobile-heavy, more returning visitors. Cluster 1 (40%): low recency, desktop-heavy, fewer returning visitors. """ corr_specs, corr_params = _make_correlated_nuisance(2, n_correlated) nuisance_features = [FeatureSpec(name=f"z{i}", kind="continuous") for i in range(n_nuisance)] all_feature_specs = tuple(list(_DEFAULT_CORE_FEATURES) + nuisance_features + list(corr_specs)) nuisance_params = { **{f"z{i}_mean": 0.0 for i in range(n_nuisance)}, **{f"z{i}_std": 1.0 for i in range(n_nuisance)}, } return MixtureConfig( features=all_feature_specs, weights=(0.6, 0.4), cluster_params=( { "session_recency_mean": 1.5, "session_recency_std": 1.0, "browse_depth_mean": 3.0, "browse_depth_std": 1.0, "device_type_probs": {"mobile": 0.60, "desktop": 0.25, "tablet": 0.10, "other": 0.05}, "is_returning_prob": 0.45, "channel_probs": {"direct": 0.40, "organic": 0.30, "paid": 0.20, "social": 0.10}, **nuisance_params, **corr_params[0], }, { "session_recency_mean": -1.5, "session_recency_std": 1.0, "browse_depth_mean": 1.5, "browse_depth_std": 1.0, "device_type_probs": {"mobile": 0.20, "desktop": 0.60, "tablet": 0.15, "other": 0.05}, "is_returning_prob": 0.20, "channel_probs": {"direct": 0.15, "organic": 0.25, "paid": 0.45, "social": 0.15}, **nuisance_params, **corr_params[1], }, ), ) def _build_4cluster_ecommerce( n_nuisance: int = 5, n_correlated: int = 0, ) -> MixtureConfig: """Build the 4-cluster MixtureConfig for the clustered_realistic template. Segments: 0 (30%): loyal-mobile — frequent mobile shoppers, high recency, returning. 1 (25%): new-desktop — new desktop visitors, medium recency, not returning. 2 (25%): deal-seekers — paid/email channel, medium browse depth. 3 (20%): inactive — low recency, low browse depth, mostly organic. """ corr_specs, corr_params = _make_correlated_nuisance(4, n_correlated) nuisance_features = [FeatureSpec(name=f"z{i}", kind="continuous") for i in range(n_nuisance)] all_feature_specs = tuple(list(_DEFAULT_CORE_FEATURES) + nuisance_features + list(corr_specs)) nuisance_params = { **{f"z{i}_mean": 0.0 for i in range(n_nuisance)}, **{f"z{i}_std": 1.0 for i in range(n_nuisance)}, } return MixtureConfig( features=all_feature_specs, weights=(0.30, 0.25, 0.25, 0.20), cluster_params=( # Cluster 0: loyal-mobile { "session_recency_mean": 2.5, "session_recency_std": 0.8, "browse_depth_mean": 4.5, "browse_depth_std": 1.5, "device_type_probs": {"mobile": 0.75, "desktop": 0.15, "tablet": 0.10}, "is_returning_prob": 0.70, "channel_probs": {"organic": 0.50, "direct": 0.30, "paid": 0.15, "social": 0.05}, **nuisance_params, **corr_params[0], }, # Cluster 1: new-desktop { "session_recency_mean": 1.0, "session_recency_std": 1.2, "browse_depth_mean": 2.5, "browse_depth_std": 1.5, "device_type_probs": {"mobile": 0.20, "desktop": 0.70, "tablet": 0.10}, "is_returning_prob": 0.15, "channel_probs": {"organic": 0.35, "paid": 0.35, "social": 0.20, "email": 0.10}, **nuisance_params, **corr_params[1], }, # Cluster 2: deal-seekers { "session_recency_mean": 0.5, "session_recency_std": 1.5, "browse_depth_mean": 5.0, "browse_depth_std": 2.0, "device_type_probs": {"mobile": 0.45, "desktop": 0.40, "tablet": 0.15}, "is_returning_prob": 0.40, "channel_probs": {"paid": 0.45, "email": 0.30, "organic": 0.15, "social": 0.10}, **nuisance_params, **corr_params[2], }, # Cluster 3: inactive { "session_recency_mean": -1.5, "session_recency_std": 1.0, "browse_depth_mean": 1.0, "browse_depth_std": 0.8, "device_type_probs": {"mobile": 0.35, "desktop": 0.50, "tablet": 0.15}, "is_returning_prob": 0.20, "channel_probs": {"organic": 0.60, "social": 0.25, "paid": 0.10, "email": 0.05}, **nuisance_params, **corr_params[3], }, ), ) def _build_6cluster_ecommerce( n_nuisance: int = 5, n_correlated: int = 0, ) -> MixtureConfig: """Build the 6-cluster MixtureConfig for the clustered_complex template. Extends 4-cluster with: 4 (12%): premium-tablet — high AOV, tablet-heavy, returning, low browse depth. 5 (8%): social-browsers — social channel, high browse depth, low conversion intent. Remaining weights re-normalized: (0.25, 0.20, 0.20, 0.15, 0.12, 0.08). """ corr_specs, corr_params = _make_correlated_nuisance(6, n_correlated) nuisance_features = [FeatureSpec(name=f"z{i}", kind="continuous") for i in range(n_nuisance)] all_feature_specs = tuple(list(_DEFAULT_CORE_FEATURES) + nuisance_features + list(corr_specs)) nuisance_params = { **{f"z{i}_mean": 0.0 for i in range(n_nuisance)}, **{f"z{i}_std": 1.0 for i in range(n_nuisance)}, } return MixtureConfig( features=all_feature_specs, weights=(0.25, 0.20, 0.20, 0.15, 0.12, 0.08), cluster_params=( # Cluster 0: loyal-mobile { "session_recency_mean": 2.5, "session_recency_std": 0.8, "browse_depth_mean": 4.5, "browse_depth_std": 1.5, "device_type_probs": {"mobile": 0.75, "desktop": 0.15, "tablet": 0.10}, "is_returning_prob": 0.70, "channel_probs": {"organic": 0.50, "direct": 0.30, "paid": 0.15, "social": 0.05}, **nuisance_params, **corr_params[0], }, # Cluster 1: new-desktop { "session_recency_mean": 1.0, "session_recency_std": 1.2, "browse_depth_mean": 2.5, "browse_depth_std": 1.5, "device_type_probs": {"mobile": 0.20, "desktop": 0.70, "tablet": 0.10}, "is_returning_prob": 0.15, "channel_probs": {"organic": 0.35, "paid": 0.35, "social": 0.20, "email": 0.10}, **nuisance_params, **corr_params[1], }, # Cluster 2: deal-seekers { "session_recency_mean": 0.5, "session_recency_std": 1.5, "browse_depth_mean": 5.0, "browse_depth_std": 2.0, "device_type_probs": {"mobile": 0.45, "desktop": 0.40, "tablet": 0.15}, "is_returning_prob": 0.40, "channel_probs": {"paid": 0.45, "email": 0.30, "organic": 0.15, "social": 0.10}, **nuisance_params, **corr_params[2], }, # Cluster 3: inactive { "session_recency_mean": -1.5, "session_recency_std": 1.0, "browse_depth_mean": 1.0, "browse_depth_std": 0.8, "device_type_probs": {"mobile": 0.35, "desktop": 0.50, "tablet": 0.15}, "is_returning_prob": 0.20, "channel_probs": {"organic": 0.60, "social": 0.25, "paid": 0.10, "email": 0.05}, **nuisance_params, **corr_params[3], }, # Cluster 4: premium-tablet { "session_recency_mean": 1.5, "session_recency_std": 1.0, "browse_depth_mean": 2.0, "browse_depth_std": 1.0, "device_type_probs": {"mobile": 0.15, "desktop": 0.20, "tablet": 0.65}, "is_returning_prob": 0.60, "channel_probs": {"direct": 0.50, "organic": 0.30, "paid": 0.15, "email": 0.05}, **nuisance_params, **corr_params[4], }, # Cluster 5: social-browsers { "session_recency_mean": 0.0, "session_recency_std": 1.5, "browse_depth_mean": 6.0, "browse_depth_std": 2.5, "device_type_probs": {"mobile": 0.60, "desktop": 0.25, "tablet": 0.15}, "is_returning_prob": 0.25, "channel_probs": {"social": 0.70, "organic": 0.15, "paid": 0.10, "email": 0.05}, **nuisance_params, **corr_params[5], }, ), ) # --------------------------------------------------------------------------- # Cluster effect profile presets (Task 6.2) # --------------------------------------------------------------------------- #: Default 2-cluster profiles for the ``clustered`` template. #: Cluster 0: positive conversion lift with linear session_recency signal. #: Cluster 1: negative conversion lift with opposing linear signal. #: Amplitudes are sized so that per-cluster mean CATEs differ by > RECOVERY (0.1) #: at the default baseline_rate=0.05 with the default 2-cluster MixtureConfig. _CLUSTERED_DEFAULT_PROFILES: tuple[ClusterEffectProfile, ...] = ( ClusterEffectProfile( conversion_delta=+0.60, severity_delta=+0.15, conversion_signals=( SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=0.5), ), severity_signals=( SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=0.3), ), ), ClusterEffectProfile( conversion_delta=-0.50, severity_delta=-0.15, conversion_signals=( SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=-0.3), ), severity_signals=( SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=-0.2), ), ), ) #: 4-cluster profiles for ``clustered_realistic``. #: Clusters have distinct response profiles across conversion and severity. _CLUSTERED_REALISTIC_PROFILES: tuple[ClusterEffectProfile, ...] = ( # Cluster 0: loyal-mobile — strong positive lift, recency + browse interaction ClusterEffectProfile( conversion_delta=+0.30, severity_delta=+0.15, conversion_signals=( SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=0.6), SignalTerm( signal_type="interaction", drivers=("session_recency", "browse_depth"), amplitude=0.3, ), ), severity_signals=( SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.4), ), ), # Cluster 1: new-desktop — moderate negative lift, browse depth signal ClusterEffectProfile( conversion_delta=-0.15, severity_delta=-0.05, conversion_signals=( SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=-0.4), ), severity_signals=(), ), # Cluster 2: deal-seekers — near-zero conversion lift, positive severity ClusterEffectProfile( conversion_delta=+0.05, severity_delta=+0.20, conversion_signals=( SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.2), ), severity_signals=( SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.5), ), ), # Cluster 3: inactive — negative lift both channels ClusterEffectProfile( conversion_delta=-0.25, severity_delta=-0.15, conversion_signals=( SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=-0.3), ), severity_signals=(), ), ) #: 6-cluster profiles for ``clustered_complex``. #: All signal types including reversed amplitudes; stress test for deep policy trees. _CLUSTERED_COMPLEX_PROFILES: tuple[ClusterEffectProfile, ...] = ( # Cluster 0: loyal-mobile — strong positive, recency + browse interaction ClusterEffectProfile( conversion_delta=+0.35, severity_delta=+0.20, conversion_signals=( SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=0.6), SignalTerm( signal_type="interaction", drivers=("session_recency", "browse_depth"), amplitude=0.4, ), ), severity_signals=( SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.5), ), ), # Cluster 1: new-desktop — negative linear + quadratic on browse_depth ClusterEffectProfile( conversion_delta=-0.20, severity_delta=-0.10, conversion_signals=( SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=-0.5), SignalTerm(signal_type="quadratic", drivers=("browse_depth",), amplitude=0.1), ), severity_signals=(), ), # Cluster 2: deal-seekers — small positive, step on channel + linear on depth ClusterEffectProfile( conversion_delta=+0.10, severity_delta=+0.25, conversion_signals=( SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.3), SignalTerm( signal_type="step", drivers=("channel",), amplitude=0.4, level_values={"paid": 0.8, "organic": -0.2, "direct": 0.0, "social": -0.3}, ), ), severity_signals=( SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=0.6), ), ), # Cluster 3: inactive — negative lift, reversed recency signal ClusterEffectProfile( conversion_delta=-0.30, severity_delta=-0.20, conversion_signals=( SignalTerm(signal_type="linear", drivers=("session_recency",), amplitude=-0.4), ), severity_signals=(), ), # Cluster 4: premium-tablet — positive lift, interaction with browse depth ClusterEffectProfile( conversion_delta=+0.25, severity_delta=+0.30, conversion_signals=( SignalTerm( signal_type="interaction", drivers=("session_recency", "browse_depth"), amplitude=0.5, ), ), severity_signals=( SignalTerm(signal_type="quadratic", drivers=("browse_depth",), amplitude=0.2), ), ), # Cluster 5: social-browsers — near-zero conversion, negative severity (browse fatigue) ClusterEffectProfile( conversion_delta=+0.02, severity_delta=-0.10, conversion_signals=( SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=-0.3), ), severity_signals=( SignalTerm(signal_type="linear", drivers=("browse_depth",), amplitude=-0.4), ), ), ) # --------------------------------------------------------------------------- # Template surface shapes (Task 3.2) # --------------------------------------------------------------------------- def _template_constant( metric_id: str, effect_scale: float, *, n_visitors: int = 1000, n_nuisance: int = 5, treatment_allocation: float = 0.5, seed: int = 42, baseline_rate: float = 0.05, severity_sigma: float = 0.5, sampler: CopulaConfig | MixtureConfig | None = None, revenue_model: str | CartRevenueConfig = "lognormal", **_kwargs, ) -> tuple[V2GeneratorConfig, dict]: """Constant treatment effect: homogeneous CATE across all visitors.""" intercept = float(logit(baseline_rate)) p0, p1_fn, _ = _logit_additive_surfaces( intercept, lambda X: np.ones(len(X)), ) return _build_scenario( template_name="constant", p0_surface=p0, p1_surface_fn=p1_fn, metric_id=metric_id, effect_scale=effect_scale, severity_sigma=severity_sigma, n_visitors=n_visitors, n_nuisance=n_nuisance, treatment_allocation=treatment_allocation, seed=seed, baseline_rate=baseline_rate, sampler=sampler, revenue_model=revenue_model, ) def _template_reversal( metric_id: str, effect_scale: float, *, n_visitors: int = 1000, n_nuisance: int = 5, treatment_allocation: float = 0.5, seed: int = 42, baseline_rate: float = 0.05, severity_sigma: float = 0.5, sampler: CopulaConfig | MixtureConfig | None = None, driver: str = "session_recency", driver_target_level: Any | None = None, **_kwargs, ) -> tuple[V2GeneratorConfig, dict]: """Reversal: CATE flips sign based on the driver feature. Continuous: h(X) = X[driver] - mean (flips at the population mean). Default driver session_recency flips at mean = 2.0. Categorical: h(X) = +0.5 target / -0.5 others (symmetric opposite-sign). """ effective_sampler = sampler if sampler is not None else _build_default_mixture_sampler(n_nuisance) driver_info = _resolve_driver(driver, effective_sampler) intercept = float(logit(baseline_rate)) signal_fn, extra_meta = _build_signal( driver_info, driver, driver_target_level, "reversal", cat_target_val=0.5, cat_other_val=-0.5, ) p0, p1_fn, m1_fn = _logit_additive_surfaces(intercept, signal_fn) config, metadata = _build_scenario( template_name="reversal", p0_surface=p0, p1_surface_fn=p1_fn, metric_id=metric_id, effect_scale=effect_scale, severity_sigma=severity_sigma, n_visitors=n_visitors, n_nuisance=n_nuisance, treatment_allocation=treatment_allocation, seed=seed, baseline_rate=baseline_rate, sampler=sampler, m1_surface_fn=m1_fn, driver=driver, ) metadata.update(extra_meta) return config, metadata def _template_sparse_benefit( metric_id: str, effect_scale: float, *, n_visitors: int = 1000, n_nuisance: int = 5, treatment_allocation: float = 0.5, seed: int = 42, baseline_rate: float = 0.05, severity_sigma: float = 0.5, benefit_prevalence: float = 0.15, sampler: CopulaConfig | MixtureConfig | None = None, driver: str = "session_recency", driver_target_level: Any | None = None, **_kwargs, ) -> tuple[V2GeneratorConfig, dict]: """Sparse benefit: only a small fraction of visitors has positive CATE. Uses threshold (continuous) or mask (categorical) — not logit-additive. """ from scipy import stats effective_sampler = sampler if sampler is not None else _build_default_mixture_sampler(n_nuisance) driver_info = _resolve_driver(driver, effective_sampler) extra_meta: dict = {} p0_surface = _const_surface(baseline_rate) if driver_info.kind == "continuous": _mu = driver_info.mean _std = driver_info.std cutoff = float(stats.norm(loc=_mu, scale=_std).ppf(1.0 - benefit_prevalence)) def p1_surface_fn(es: float) -> SurfaceConfig: above_p1 = min(baseline_rate + es * 0.40, 0.99) return threshold_surface( SurfaceConfig(fn=lambda X, _d=driver: X[_d].values.astype(float)), cutoff=cutoff, above=above_p1, below=baseline_rate, ) else: _t, _src = _resolve_categorical_target(driver_info, driver_target_level, "sparse_benefit") extra_meta = {"driver_target_level": _t, "driver_target_source": _src} def _benefit_mask_fn(X: pd.DataFrame, _d: str = driver, _tgt: Any = _t) -> np.ndarray: return X[_d].values == _tgt def p1_surface_fn(es: float) -> SurfaceConfig: above_p1 = min(baseline_rate + es * 0.40, 0.99) def _fn(X: pd.DataFrame, _br: float = baseline_rate, _ap: float = above_p1, _mask_fn=_benefit_mask_fn) -> np.ndarray: return np.where(_mask_fn(X), _ap, _br).astype(float) return SurfaceConfig(fn=_fn) config, metadata = _build_scenario( template_name="sparse_benefit", p0_surface=p0_surface, p1_surface_fn=p1_surface_fn, metric_id=metric_id, effect_scale=effect_scale, severity_sigma=severity_sigma, n_visitors=n_visitors, n_nuisance=n_nuisance, treatment_allocation=treatment_allocation, seed=seed, baseline_rate=baseline_rate, sampler=sampler, driver=driver, ) metadata["benefit_prevalence"] = benefit_prevalence metadata.update(extra_meta) return config, metadata def _template_interaction_only( metric_id: str, effect_scale: float, *, n_visitors: int = 1000, n_nuisance: int = 5, treatment_allocation: float = 0.5, seed: int = 42, baseline_rate: float = 0.05, severity_sigma: float = 0.5, sampler: CopulaConfig | MixtureConfig | None = None, driver: str = "session_recency", driver_secondary: str = "browse_depth", **_kwargs, ) -> tuple[V2GeneratorConfig, dict]: """Interaction-only: CATE dominated by driver×driver_secondary product term.""" effective_sampler = sampler if sampler is not None else _build_default_mixture_sampler(n_nuisance) driver_info = _resolve_driver(driver, effective_sampler) driver2_info = _resolve_driver(driver_secondary, effective_sampler) _require_continuous(driver_info, "driver", "interaction_only") _require_continuous(driver2_info, "driver_secondary", "interaction_only") intercept = float(logit(baseline_rate)) _mu1 = driver_info.mean _mu2 = driver2_info.mean def raw_signal(X, _d=driver, _ds=driver_secondary, _m1=_mu1, _m2=_mu2): return ( (X[_d].values.astype(float) - _m1) * (X[_ds].values.astype(float) - _m2) ) p0, p1_fn, _ = _logit_additive_surfaces(intercept, raw_signal) config, metadata = _build_scenario( template_name="interaction_only", p0_surface=p0, p1_surface_fn=p1_fn, metric_id=metric_id, effect_scale=effect_scale, severity_sigma=severity_sigma, n_visitors=n_visitors, n_nuisance=n_nuisance, treatment_allocation=treatment_allocation, seed=seed, baseline_rate=baseline_rate, sampler=sampler, driver=driver, ) metadata["driver_secondary"] = driver_secondary return config, metadata def _template_nonlinear( metric_id: str, effect_scale: float, *, n_visitors: int = 1000, n_nuisance: int = 5, treatment_allocation: float = 0.5, seed: int = 42, baseline_rate: float = 0.05, severity_sigma: float = 0.5, sampler: CopulaConfig | MixtureConfig | None = None, driver: str = "session_recency", driver_secondary: str = "browse_depth", **_kwargs, ) -> tuple[V2GeneratorConfig, dict]: """Nonlinear: quadratic + interaction treatment surface.""" effective_sampler = sampler if sampler is not None else _build_default_mixture_sampler(n_nuisance) driver_info = _resolve_driver(driver, effective_sampler) driver2_info = _resolve_driver(driver_secondary, effective_sampler) _require_continuous(driver_info, "driver", "nonlinear") _require_continuous(driver2_info, "driver_secondary", "nonlinear") intercept = float(logit(baseline_rate)) _mu = driver_info.mean _mu2 = driver2_info.mean def raw_signal(X, _d=driver, _ds=driver_secondary, _m=_mu, _m2=_mu2): return ( 0.5 * (X[_d].values.astype(float) - _m) ** 2 + 0.5 * (X[_d].values.astype(float) - _m) * (X[_ds].values.astype(float) - _m2) ) # p0 uses _const_surface(baseline_rate) — mathematically identical to # sigmoid(logit(baseline_rate)) but preserves the original constant form. p0_surface = _const_surface(baseline_rate) _, p1_fn, _ = _logit_additive_surfaces(intercept, raw_signal) config, metadata = _build_scenario( template_name="nonlinear", p0_surface=p0_surface, p1_surface_fn=p1_fn, metric_id=metric_id, effect_scale=effect_scale, severity_sigma=severity_sigma, n_visitors=n_visitors, n_nuisance=n_nuisance, treatment_allocation=treatment_allocation, seed=seed, baseline_rate=baseline_rate, sampler=sampler, driver=driver, ) metadata["surface_type"] = "quadratic_interaction" metadata["driver_secondary"] = driver_secondary return config, metadata def _build_clustered_surfaces( mixture_sampler: MixtureConfig, profiles: tuple[ClusterEffectProfile, ...], driver_info_map: dict[str, DriverInfo], *, baseline_rate: float, effect_scale: float, hte_scale: float, max_aov_ratio: float = 200.0, ) -> tuple[SurfaceConfig, SurfaceConfig, Callable[[float], SurfaceConfig], Callable[[float], SurfaceConfig]]: """Build (p0_surface, m0_surface, p1_surface_fn, m1_surface_fn) for clustered template. The latent cluster type drives both prognostic outcomes (p0, m0) and treatment response (p1, m1). Per-cluster prognostic baselines and within-cluster signal terms live on ``ClusterEffectProfile``. Observed features are noisy windows into the latent type — BCF only sees features, not cluster_id. Parameters ---------- mixture_sampler: MixtureConfig whose cluster_id column is available in X during surface evaluation. profiles: Per-cluster effect + prognostic specifications. Must have the same length as ``mixture_sampler.weights``. driver_info_map: Resolved DriverInfo for every feature referenced in any SignalTerm across all profiles (including prognostic signals). baseline_rate: Global baseline conversion probability. Per-cluster prognostic_conv_base shifts are centered so the weighted mean stays at this rate. effect_scale: Global CATE magnitude scaling. hte_scale: Within-cluster HTE signal amplitude scaling. ``hte_scale=0`` suppresses all within-cluster treatment gradients; ``hte_scale=1`` applies full amplitudes. Returns ------- tuple ``(p0_surface, m0_surface, p1_surface_fn, m1_surface_fn)`` where: - ``p0_surface`` has per-cluster baselines + within-cluster prognostic signals. - ``m0_surface`` has per-cluster baselines + within-cluster prognostic signals. - ``p1_surface_fn(es)`` builds treatment conversion on top of heterogeneous p0. - ``m1_surface_fn(es)`` builds treatment severity on top of heterogeneous m0. """ n_clusters = len(profiles) _m0_base = 50.0 # Pre-compute per-cluster treatment logit shifts and severity factors. _conv_shifts: list[float] = [ _relative_lift_to_logit_shift(profiles[k].conversion_delta, baseline_rate) for k in range(n_clusters) ] _sev_factors: list[float] = [ _severity_multiplicative_factor(profiles[k].severity_delta) for k in range(n_clusters) ] _logit_p0 = float(logit(baseline_rate)) _log_m0 = float(np.log(_m0_base)) def _eval_all_signals( X: pd.DataFrame, signals: tuple[SignalTerm, ...], _hs: float, _di: dict[str, DriverInfo], ) -> np.ndarray: """Sum all signal terms, scaled by hte_scale.""" if not signals or _hs == 0.0: return np.zeros(len(X), dtype=float) total = np.zeros(len(X), dtype=float) for term in signals: total += _evaluate_signal(term, X, _di) return _hs * total def _eval_prognostic_signals( X: pd.DataFrame, signals: tuple[SignalTerm, ...], _di: dict[str, DriverInfo], ) -> np.ndarray: """Sum prognostic signal terms (no hte_scale — amplitudes are baked in).""" if not signals: return np.zeros(len(X), dtype=float) total = np.zeros(len(X), dtype=float) for term in signals: total += _evaluate_signal(term, X, _di) return total # Collect all driver column names used by any signal across all profiles # (HTE + prognostic), so surface functions can slice only the needed columns. _driver_cols: set[str] = set() for prof in profiles: for sig in (prof.conversion_signals + prof.severity_signals + prof.prognostic_conv_signals + prof.prognostic_sev_signals): _driver_cols.update(sig.drivers) # Always need cluster_id for masking. _driver_cols.add("cluster_id") _driver_col_list: list[str] = sorted(_driver_cols) # --- p0 surface: per-cluster baselines + within-cluster prognostic signals --- def _p0_fn( X: pd.DataFrame, _lp0: float = _logit_p0, _profs: tuple[ClusterEffectProfile, ...] = profiles, _di: dict[str, DriverInfo] = driver_info_map, _cols: list[str] = _driver_col_list, ) -> np.ndarray: X_slim = cast(pd.DataFrame, X[_cols]) cid = np.asarray(X_slim["cluster_id"]) logit_p0 = np.full(len(X), _lp0, dtype=float) for k, prof in enumerate(_profs): mask = np.asarray(cid == k) if not np.any(mask): continue # Latent type component: per-cluster baseline shift logit_p0[mask] += prof.prognostic_conv_base # Within-cluster feature gradients logit_p0[mask] += _eval_prognostic_signals( X_slim.loc[mask], prof.prognostic_conv_signals, _di, ) return expit(logit_p0) p0_surface = SurfaceConfig(fn=_p0_fn) # --- m0 surface: per-cluster baselines + within-cluster prognostic signals --- # Clamp log-space to [m0_base/ratio, m0_base*ratio] before exp() _log_m0_floor = float(np.log(_m0_base / max_aov_ratio)) _log_m0_ceil = float(np.log(_m0_base * max_aov_ratio)) def _m0_fn( X: pd.DataFrame, _lm0: float = _log_m0, _profs: tuple[ClusterEffectProfile, ...] = profiles, _di: dict[str, DriverInfo] = driver_info_map, _cols: list[str] = _driver_col_list, _floor: float = _log_m0_floor, _ceil: float = _log_m0_ceil, ) -> np.ndarray: X_slim = cast(pd.DataFrame, X[_cols]) cid = np.asarray(X_slim["cluster_id"]) log_m0 = np.full(len(X), _lm0, dtype=float) for k, prof in enumerate(_profs): mask = np.asarray(cid == k) if not np.any(mask): continue log_m0[mask] += prof.prognostic_sev_base log_m0[mask] += _eval_prognostic_signals( X_slim.loc[mask], prof.prognostic_sev_signals, _di, ) np.clip(log_m0, _floor, _ceil, out=log_m0) return np.exp(log_m0) m0_surface = SurfaceConfig(fn=_m0_fn) # --- p1 surface: treatment delta on top of heterogeneous p0 --- def p1_surface_fn(es: float) -> SurfaceConfig: def _fn( X: pd.DataFrame, _es: float = es, _hs: float = hte_scale, _shifts: list[float] = _conv_shifts, _profs: tuple[ClusterEffectProfile, ...] = profiles, _di: dict[str, DriverInfo] = driver_info_map, _cols: list[str] = _driver_col_list, _p0: Callable = _p0_fn, ) -> np.ndarray: # Start from individual heterogeneous baselines p0_vals = _p0(X) logit_p1 = logit(p0_vals) X_slim = cast(pd.DataFrame, X[_cols]) cid = np.asarray(X_slim["cluster_id"]) for k, (shift, prof) in enumerate(zip(_shifts, _profs, strict=False)): mask = np.asarray(cid == k) if not np.any(mask): continue X_k = X_slim.loc[mask] within = _eval_all_signals(X_k, prof.conversion_signals, _hs, _di) logit_p1[mask] += _es * (shift + within) return expit(logit_p1) return SurfaceConfig(fn=_fn) # --- m1 surface: treatment delta on top of heterogeneous m0 --- def m1_surface_fn(es: float) -> SurfaceConfig: def _fn( X: pd.DataFrame, _es: float = es, _hs: float = hte_scale, _factors: list[float] = _sev_factors, _profs: tuple[ClusterEffectProfile, ...] = profiles, _di: dict[str, DriverInfo] = driver_info_map, _cols: list[str] = _driver_col_list, _m0: Callable = _m0_fn, ) -> np.ndarray: # Start from individual heterogeneous baselines m0_vals = _m0(X) X_slim = cast(pd.DataFrame, X[_cols]) cid = np.asarray(X_slim["cluster_id"]) m1 = np.copy(m0_vals) for k, (factor, prof) in enumerate(zip(_factors, _profs, strict=False)): mask = np.asarray(cid == k) if not np.any(mask): continue X_k = X_slim.loc[mask] within = _eval_all_signals(X_k, prof.severity_signals, _hs, _di) cluster_delta = _es * ((factor - 1.0) + within) m1[mask] = np.maximum(m0_vals[mask] * (1.0 + cluster_delta), 1e-3) return m1 return SurfaceConfig(fn=_fn) return p0_surface, m0_surface, p1_surface_fn, m1_surface_fn def _collect_driver_info_for_profiles( profiles: tuple[ClusterEffectProfile, ...], mixture_sampler: MixtureConfig, ) -> dict[str, DriverInfo]: """Resolve DriverInfo for all features referenced in any SignalTerm across all profiles.""" driver_names: set[str] = set() for prof in profiles: for term in (prof.conversion_signals + prof.severity_signals + prof.prognostic_conv_signals + prof.prognostic_sev_signals): driver_names.update(term.drivers) return {name: _resolve_driver(name, mixture_sampler) for name in driver_names} def _template_clustered( metric_id: str, effect_scale: float, *, n_visitors: int = 1000, n_nuisance: int = 5, n_correlated: int = 0, treatment_allocation: float = 0.5, seed: int = 42, baseline_rate: float = 0.05, severity_sigma: float = 0.5, sampler: CopulaConfig | MixtureConfig | None = None, hte_scale: float = 1.0, driver: str = "session_recency", driver_target_level: Any | None = None, **_kwargs, ) -> tuple[V2GeneratorConfig, dict]: """Clustered: generalized per-cluster logit-additive surfaces with 2-cluster MixtureConfig. Uses ``_CLUSTERED_DEFAULT_PROFILES`` and ``_build_clustered_sampler`` to build surfaces via the generalized ``_build_clustered_surfaces`` helper. Parameters ---------- hte_scale: Within-cluster signal amplitude scaling. ``0`` = step function (cluster-level deltas only); ``1`` = full within-cluster gradients active. n_correlated: Number of cluster-correlated nuisance features (default 0). """ if sampler is not None: raise ValueError( "clustered template uses its own MixtureConfig sampler; " "external sampler= override is not supported" ) mixture_sampler = _build_clustered_sampler(n_nuisance, n_correlated) profiles = _CLUSTERED_DEFAULT_PROFILES driver_info_map = _collect_driver_info_for_profiles(profiles, mixture_sampler) p0_surface, _m0_surf, p1_surface_fn, m1_surface_fn = _build_clustered_surfaces( mixture_sampler, profiles, driver_info_map, baseline_rate=baseline_rate, effect_scale=effect_scale, hte_scale=hte_scale, ) config, metadata = _build_scenario( template_name="clustered", p0_surface=p0_surface, p1_surface_fn=p1_surface_fn, metric_id=metric_id, effect_scale=effect_scale, severity_sigma=severity_sigma, n_visitors=n_visitors, n_nuisance=n_nuisance, treatment_allocation=treatment_allocation, seed=seed, baseline_rate=baseline_rate, sampler=mixture_sampler, m1_surface_fn=m1_surface_fn, driver=driver, ) metadata["n_clusters"] = len(mixture_sampler.weights) metadata["hte_scale"] = hte_scale metadata["n_correlated"] = n_correlated return config, metadata def _build_clustered_surfaces_marm( mixture_sampler: MixtureConfig, profiles: tuple[ClusterEffectProfile, ...], driver_info_map: dict[str, DriverInfo], *, baseline_rate: float, effect_scale: float, hte_scale: float, K: int, max_aov_ratio: float = 200.0, ) -> tuple[ list[SurfaceConfig], list[SurfaceConfig] | None, list[SurfaceConfig] | None, ]: """Build K per-treatment-level potential-outcome surfaces for K-arm clustered_realistic. Constructs: - ``p_surfaces[0]``: heterogeneous control conversion (identical to _build_clustered_surfaces p0). - ``p_surfaces[k]`` for k=1..K-1: treatment-k conversion built on top of p0. - ``m_surfaces[0]``: heterogeneous control severity (from per-cluster baselines). - ``m_surfaces[k]`` for k=1..K-1: treatment-k severity built on top of m0. - ``sigma_surfaces[k]``: constant severity_sigma for all k (hardcoded 0.5 by default; the caller passes severity_sigma as part of surface construction). Treatment-response scheme: For treatment level t (t = 1..K-1), the per-cluster responses are derived from _CLUSTERED_REALISTIC_PROFILES by: 1. Cyclic-shifting the base cluster assignment by s = (t-1) % n_clusters, so treatment t at cluster c uses base profile[(c - s) % n_clusters]'s treatment signals. This gives genuinely independent CATE patterns across treatments. 2. Scaling the per-cluster conversion_delta and severity_delta by mag_t = linspace(0.7, 1.3, K-1)[t-1], ensuring treatments differ even when shifts repeat at high K. Parameters ---------- mixture_sampler: MixtureConfig whose cluster_id column is available in X during surface evaluation. profiles: Base per-cluster profiles (length = n_clusters = 4 for clustered_realistic). driver_info_map: Resolved DriverInfo for all signal drivers. baseline_rate: Global baseline conversion probability. effect_scale: Global CATE magnitude scaling. hte_scale: Within-cluster signal amplitude scaling. K: Number of treatment levels (K >= 3). max_aov_ratio: Clamp ratio for severity log-space. Returns ------- (p_surfaces, m_surfaces, sigma_surfaces) Each is a list of K SurfaceConfig objects. For conversion_rate callers, m_surfaces and sigma_surfaces are None — callers should pass severity_sigma to build sigma_surfaces if needed. Here we always return all three; the caller drops m_surfaces/sigma_surfaces when metric_id == "conversion_rate". """ n_clusters = len(profiles) _m0_base = 50.0 _log_m0 = float(np.log(_m0_base)) _logit_p0 = float(logit(baseline_rate)) _log_m0_floor = float(np.log(_m0_base / max_aov_ratio)) _log_m0_ceil = float(np.log(_m0_base * max_aov_ratio)) # Per-treatment magnitude scaling (endpoints are exact, so K=3 yields # exactly [0.7, 1.3]). _mags = np.linspace(0.7, 1.3, K - 1) # Derive K-1 treatment-response tuples (one per treatment level t=1..K-1). # Each tuple: (per_cluster_conv_shifts, per_cluster_sev_factors, per_cluster_signals) # per_cluster_conv_shifts[c] = logit shift for cluster c under treatment t. # per_cluster_sev_factors[c] = multiplicative severity factor for cluster c. # per_cluster_conv_signals[c] = tuple of conversion SignalTerms for cluster c. # per_cluster_sev_signals[c] = tuple of severity SignalTerms for cluster c. _treatment_responses: list[ tuple[list[float], list[float], list[tuple[SignalTerm, ...]], list[tuple[SignalTerm, ...]]] ] = [] for t in range(1, K): mag = float(_mags[t - 1]) s = (t - 1) % n_clusters conv_shifts: list[float] = [] sev_factors: list[float] = [] conv_sigs: list[tuple[SignalTerm, ...]] = [] sev_sigs: list[tuple[SignalTerm, ...]] = [] for c in range(n_clusters): src = (c - s) % n_clusters base_prof = profiles[src] scaled_delta = base_prof.conversion_delta * mag scaled_sev = base_prof.severity_delta * mag conv_shifts.append( _relative_lift_to_logit_shift(scaled_delta, baseline_rate) ) sev_factors.append( _severity_multiplicative_factor( max(scaled_sev, -0.9) # clamp so factor stays positive ) ) conv_sigs.append(base_prof.conversion_signals) sev_sigs.append(base_prof.severity_signals) _treatment_responses.append((conv_shifts, sev_factors, conv_sigs, sev_sigs)) # --- Collect driver cols needed (union across all profiles) --- _driver_cols: set[str] = set() for prof in profiles: for sig in (prof.conversion_signals + prof.severity_signals + prof.prognostic_conv_signals + prof.prognostic_sev_signals): _driver_cols.update(sig.drivers) _driver_cols.add("cluster_id") _driver_col_list: list[str] = sorted(_driver_cols) # --- Inline helpers (replicated from _build_clustered_surfaces, not shared) --- def _eval_all_signals_marm( X: pd.DataFrame, signals: tuple[SignalTerm, ...], _hs: float, _di: dict[str, DriverInfo], ) -> np.ndarray: if not signals or _hs == 0.0: return np.zeros(len(X), dtype=float) total = np.zeros(len(X), dtype=float) for term in signals: total += _evaluate_signal(term, X, _di) return _hs * total def _eval_prognostic_signals_marm( X: pd.DataFrame, signals: tuple[SignalTerm, ...], _di: dict[str, DriverInfo], ) -> np.ndarray: if not signals: return np.zeros(len(X), dtype=float) total = np.zeros(len(X), dtype=float) for term in signals: total += _evaluate_signal(term, X, _di) return total # --- Control surfaces (k=0): same construction as _build_clustered_surfaces p0/m0 --- def _p0_fn_marm( X: pd.DataFrame, _lp0: float = _logit_p0, _profs: tuple[ClusterEffectProfile, ...] = profiles, _di: dict[str, DriverInfo] = driver_info_map, _cols: list[str] = _driver_col_list, ) -> np.ndarray: X_slim = cast(pd.DataFrame, X[_cols]) cid = np.asarray(X_slim["cluster_id"]) logit_p0 = np.full(len(X), _lp0, dtype=float) for k, prof in enumerate(_profs): mask = np.asarray(cid == k) if not np.any(mask): continue logit_p0[mask] += prof.prognostic_conv_base logit_p0[mask] += _eval_prognostic_signals_marm( X_slim.loc[mask], prof.prognostic_conv_signals, _di, ) return expit(logit_p0) p0_surface = SurfaceConfig(fn=_p0_fn_marm) def _m0_fn_marm( X: pd.DataFrame, _lm0: float = _log_m0, _profs: tuple[ClusterEffectProfile, ...] = profiles, _di: dict[str, DriverInfo] = driver_info_map, _cols: list[str] = _driver_col_list, _floor: float = _log_m0_floor, _ceil: float = _log_m0_ceil, ) -> np.ndarray: X_slim = cast(pd.DataFrame, X[_cols]) cid = np.asarray(X_slim["cluster_id"]) log_m0 = np.full(len(X), _lm0, dtype=float) for k, prof in enumerate(_profs): mask = np.asarray(cid == k) if not np.any(mask): continue log_m0[mask] += prof.prognostic_sev_base log_m0[mask] += _eval_prognostic_signals_marm( X_slim.loc[mask], prof.prognostic_sev_signals, _di, ) np.clip(log_m0, _floor, _ceil, out=log_m0) return np.exp(log_m0) m0_surface = SurfaceConfig(fn=_m0_fn_marm) # --- Treatment surfaces (k=1..K-1) --- p_treatment_surfaces: list[SurfaceConfig] = [] m_treatment_surfaces: list[SurfaceConfig] = [] for t in range(1, K): t_conv_shifts, t_sev_factors, t_conv_sigs, t_sev_sigs = _treatment_responses[t - 1] def _make_pk_surface( _es: float, _hs: float, _shifts: list[float], _csigs: list[tuple[SignalTerm, ...]], _di: dict[str, DriverInfo], _cols: list[str], _p0_fn: Callable[[pd.DataFrame], np.ndarray], _nc: int, ) -> SurfaceConfig: def _fn( X: pd.DataFrame, _es: float = _es, _hs: float = _hs, _shifts: list[float] = _shifts, _csigs: list[tuple[SignalTerm, ...]] = _csigs, _di: dict[str, DriverInfo] = _di, _cols: list[str] = _cols, _p0_fn: Callable[[pd.DataFrame], np.ndarray] = _p0_fn, _nc: int = _nc, ) -> np.ndarray: p0_vals = _p0_fn(X) logit_pk = logit(p0_vals) X_slim = cast(pd.DataFrame, X[_cols]) cid = np.asarray(X_slim["cluster_id"]) for c in range(_nc): mask = np.asarray(cid == c) if not np.any(mask): continue X_c = X_slim.loc[mask] within = _eval_all_signals_marm(X_c, _csigs[c], _hs, _di) logit_pk[mask] += _es * (_shifts[c] + within) return expit(logit_pk) return SurfaceConfig(fn=_fn) p_treatment_surfaces.append(_make_pk_surface( effect_scale, hte_scale, t_conv_shifts, t_conv_sigs, driver_info_map, _driver_col_list, _p0_fn_marm, n_clusters, )) def _make_mk_surface( _es: float, _hs: float, _factors: list[float], _ssigs: list[tuple[SignalTerm, ...]], _di: dict[str, DriverInfo], _cols: list[str], _m0_fn: Callable[[pd.DataFrame], np.ndarray], _nc: int, ) -> SurfaceConfig: def _fn( X: pd.DataFrame, _es: float = _es, _hs: float = _hs, _factors: list[float] = _factors, _ssigs: list[tuple[SignalTerm, ...]] = _ssigs, _di: dict[str, DriverInfo] = _di, _cols: list[str] = _cols, _m0_fn: Callable[[pd.DataFrame], np.ndarray] = _m0_fn, _nc: int = _nc, ) -> np.ndarray: m0_vals = _m0_fn(X) X_slim = cast(pd.DataFrame, X[_cols]) cid = np.asarray(X_slim["cluster_id"]) mk = np.copy(m0_vals) for c in range(_nc): mask = np.asarray(cid == c) if not np.any(mask): continue X_c = X_slim.loc[mask] within = _eval_all_signals_marm(X_c, _ssigs[c], _hs, _di) cluster_delta = _es * ((_factors[c] - 1.0) + within) mk[mask] = np.maximum(m0_vals[mask] * (1.0 + cluster_delta), 1e-3) return mk return SurfaceConfig(fn=_fn) m_treatment_surfaces.append(_make_mk_surface( effect_scale, hte_scale, t_sev_factors, t_sev_sigs, driver_info_map, _driver_col_list, _m0_fn_marm, n_clusters, )) p_surfaces: list[SurfaceConfig] = [p0_surface] + p_treatment_surfaces m_surfaces_list: list[SurfaceConfig] = [m0_surface] + m_treatment_surfaces return p_surfaces, m_surfaces_list, None # sigma_surfaces built by caller def _template_clustered_realistic( metric_id: str, effect_scale: float, *, K: int = 2, n_visitors: int = 1000, n_nuisance: int = 5, n_correlated: int = 0, treatment_allocation: float = 0.5, seed: int = 42, baseline_rate: float = 0.05, severity_sigma: float = 0.5, sampler: CopulaConfig | MixtureConfig | None = None, hte_scale: float = 1.0, **_kwargs, ) -> tuple[V2GeneratorConfig, dict]: """Clustered realistic: 4-cluster MixtureConfig with additive within-cluster signals. Primary scenario for sequential targeting validation. Uses linear and interaction signal terms across the 4 e-commerce segments. Parameters ---------- K: Number of treatment levels (control + K−1 treatments). ``K=2`` (default) preserves the existing binary shape (bit-identical to the pre-change implementation). ``K>=3`` produces a multi-treatment config carrying one potential-outcome surface per treatment level. hte_scale: Within-cluster signal amplitude scaling. ``0`` = step function; ``1`` = full gradients. n_correlated: Number of cluster-correlated nuisance features (default 0). """ if K < 2: raise ValueError( f"clustered_realistic: K must be >= 2, got K={K}. " "K < 2 is not a valid experiment." ) if sampler is not None: raise ValueError( "clustered_realistic template uses its own MixtureConfig sampler; " "external sampler= override is not supported" ) mixture_sampler = _build_4cluster_ecommerce(n_nuisance, n_correlated) profiles = _CLUSTERED_REALISTIC_PROFILES driver_info_map = _collect_driver_info_for_profiles(profiles, mixture_sampler) if K == 2: # Existing K=2 path — unchanged, bit-identical to pre-change implementation. p0_surface, _m0_surf, p1_surface_fn, m1_surface_fn = _build_clustered_surfaces( mixture_sampler, profiles, driver_info_map, baseline_rate=baseline_rate, effect_scale=effect_scale, hte_scale=hte_scale, ) config, metadata = _build_scenario( template_name="clustered_realistic", p0_surface=p0_surface, p1_surface_fn=p1_surface_fn, metric_id=metric_id, effect_scale=effect_scale, severity_sigma=severity_sigma, n_visitors=n_visitors, n_nuisance=n_nuisance, treatment_allocation=treatment_allocation, seed=seed, baseline_rate=baseline_rate, sampler=mixture_sampler, m1_surface_fn=m1_surface_fn, driver="session_recency", ) metadata["n_clusters"] = len(mixture_sampler.weights) metadata["hte_scale"] = hte_scale metadata["n_correlated"] = n_correlated return config, metadata # K >= 3: multi-arm path. p_surfaces, m_surfaces_all, _ = _build_clustered_surfaces_marm( mixture_sampler, profiles, driver_info_map, baseline_rate=baseline_rate, effect_scale=effect_scale, hte_scale=hte_scale, K=K, ) # Build uniform arm allocation: [1/K, ..., 1/K], last entry normalized to sum to 1. _frac = 1.0 / K arm_alloc_list = [_frac] * (K - 1) arm_alloc_list.append(1.0 - sum(arm_alloc_list)) treatment_probabilities = tuple(arm_alloc_list) if metric_id == "conversion_rate": metric_mode = MetricMode( metric_id="conversion_rate", p_surfaces=p_surfaces, ) elif metric_id == "revenue_per_visitor": sigma_surfaces = [_const_surface(severity_sigma) for _ in range(K)] assert m_surfaces_all is not None metric_mode = MetricMode( metric_id="revenue_per_visitor", p_surfaces=p_surfaces, m_surfaces=m_surfaces_all, sigma_surfaces=sigma_surfaces, ) else: raise ValueError( f"clustered_realistic: unsupported metric_id {metric_id!r}. " "Must be 'conversion_rate' or 'revenue_per_visitor'." ) assignment = AssignmentConfig(treatment_probabilities=treatment_probabilities) config = V2GeneratorConfig( n_visitors=n_visitors, feature_sampler=mixture_sampler, metric_mode=metric_mode, assignment=assignment, seed=seed, experiment_id=f"clustered_realistic-es{effect_scale}-seed{seed}-K{K}", revenue_model=None, ) metadata: dict[str, str | int | float] = { "scenario_class": "clustered_realistic", "template": "clustered_realistic", "metric_id": metric_id, "effect_scale": effect_scale, "n_visitors": n_visitors, "seed": seed, "baseline_rate": baseline_rate, "n_nuisance": n_nuisance, "revenue_model": "lognormal", "driver": "session_recency", "n_clusters": len(mixture_sampler.weights), "hte_scale": hte_scale, "n_correlated": n_correlated, "K": K, } # NOTE: no scalar "treatment_allocation" key at K>=3 — the fixed forced # per-treatment assignment lives on config.assignment.treatment_probabilities. # (A sequential experiment routes treatments by cell policy, not this vector.) return config, metadata def _template_clustered_complex( metric_id: str, effect_scale: float, *, n_visitors: int = 1000, n_nuisance: int = 5, n_correlated: int = 0, treatment_allocation: float = 0.5, seed: int = 42, baseline_rate: float = 0.05, severity_sigma: float = 0.5, sampler: CopulaConfig | MixtureConfig | None = None, hte_scale: float = 1.0, **_kwargs, ) -> tuple[V2GeneratorConfig, dict]: """Clustered complex: 6-cluster MixtureConfig with all signal types. Stress test for deep policy trees. Includes linear, interaction, and quadratic signal terms, with reversed amplitudes across clusters. Parameters ---------- hte_scale: Within-cluster signal amplitude scaling. ``0`` = step function; ``1`` = full gradients. n_correlated: Number of cluster-correlated nuisance features (default 0). """ if sampler is not None: raise ValueError( "clustered_complex template uses its own MixtureConfig sampler; " "external sampler= override is not supported" ) mixture_sampler = _build_6cluster_ecommerce(n_nuisance, n_correlated) profiles = _CLUSTERED_COMPLEX_PROFILES driver_info_map = _collect_driver_info_for_profiles(profiles, mixture_sampler) p0_surface, _m0_surf, p1_surface_fn, m1_surface_fn = _build_clustered_surfaces( mixture_sampler, profiles, driver_info_map, baseline_rate=baseline_rate, effect_scale=effect_scale, hte_scale=hte_scale, ) config, metadata = _build_scenario( template_name="clustered_complex", p0_surface=p0_surface, p1_surface_fn=p1_surface_fn, metric_id=metric_id, effect_scale=effect_scale, severity_sigma=severity_sigma, n_visitors=n_visitors, n_nuisance=n_nuisance, treatment_allocation=treatment_allocation, seed=seed, baseline_rate=baseline_rate, sampler=mixture_sampler, m1_surface_fn=m1_surface_fn, driver="session_recency", ) metadata["n_clusters"] = len(mixture_sampler.weights) metadata["hte_scale"] = hte_scale metadata["n_correlated"] = n_correlated return config, metadata def _template_monotone_gradient( metric_id: str, effect_scale: float, *, n_visitors: int = 1000, n_nuisance: int = 5, treatment_allocation: float = 0.5, seed: int = 42, baseline_rate: float = 0.05, severity_sigma: float = 0.5, sampler: CopulaConfig | MixtureConfig | None = None, revenue_model: str | CartRevenueConfig = "lognormal", channel_mode: str | None = None, prognostic_features: list[str] | None = None, prescriptive_features: list[str] | None = None, driver: str = "session_recency", driver_target_level: str | None = None, severity_driver: str = "browse_depth", **_kwargs, ) -> tuple[V2GeneratorConfig, dict]: """Monotone gradient: CATE increases smoothly with the driver feature. Continuous: h(X) = 2.0 × (X[driver] - mean) (dose-response curve). Categorical: h(X) = +1.0 target / 0.0 others (step function). """ effective_sampler = sampler if sampler is not None else _build_default_mixture_sampler(n_nuisance) driver_info = _resolve_driver(driver, effective_sampler) intercept = float(logit(baseline_rate)) signal_fn, extra_meta = _build_signal( driver_info, driver, driver_target_level, "monotone_gradient", cat_target_val=1.0, cat_other_val=0.0, ) p0, p1_fn, m1_fn = _logit_additive_surfaces( intercept, signal_fn, conversion_slope=2.0, ) config, metadata = _build_scenario( template_name="monotone_gradient", p0_surface=p0, p1_surface_fn=p1_fn, metric_id=metric_id, effect_scale=effect_scale, severity_sigma=severity_sigma, n_visitors=n_visitors, n_nuisance=n_nuisance, treatment_allocation=treatment_allocation, seed=seed, baseline_rate=baseline_rate, sampler=sampler, revenue_model=revenue_model, m1_surface_fn=m1_fn, channel_mode=channel_mode, prognostic_features=prognostic_features, prescriptive_features=prescriptive_features, driver=driver, severity_driver=severity_driver, ) metadata.update(extra_meta) return config, metadata # --------------------------------------------------------------------------- # TEMPLATES dict (Task 3.2) # --------------------------------------------------------------------------- #: Registered scenario templates. #: Keys are template names; values are callables accepting #: ``(metric_id: str, effect_scale: float, **kwargs)`` and returning #: ``(V2GeneratorConfig, dict)``. TEMPLATES: dict[str, Callable[..., tuple[V2GeneratorConfig, dict]]] = { "constant": _template_constant, "reversal": _template_reversal, "sparse_benefit": _template_sparse_benefit, "nonlinear": _template_nonlinear, "monotone_gradient": _template_monotone_gradient, "interaction_only": _template_interaction_only, "clustered": _template_clustered, "clustered_realistic": _template_clustered_realistic, "clustered_complex": _template_clustered_complex, } # --------------------------------------------------------------------------- # Canonical sweep contract type alias # --------------------------------------------------------------------------- #: Tuple of ``(scenario_fn, kwargs)``. Each pair can be invoked as #: ``scenario_fn(seed=seed, **kwargs)`` to produce a #: ``(V2GeneratorConfig, dict)`` result. The kwargs dict MUST NOT include #: ``seed`` — seed is injected by ``run_suite_parallel``. ScenarioSpec = tuple[Callable[..., tuple[V2GeneratorConfig, dict]], dict[str, Any]] # --------------------------------------------------------------------------- # build_catalog (Tasks 18.1, 9.2) # --------------------------------------------------------------------------- # Hurdle metric IDs — channel_mode applies only to these. _HURDLE_METRICS = {"revenue_per_visitor"} # Marginal sweep relevance mapping: # key → (reference_templates, reference_metrics, kwarg_name) # "both" = all metric_ids provided by the caller. _MARGINAL_SWEEP_REF: dict[str, tuple[str, str]] = { # (reference_template, reference_metric_set) # Pairs are (template_name, "hurdle" | "both") "severity_sigma_grid": ("constant", "hurdle"), "n_nuisance_grid": ("constant", "both"), "n_visitors_grid": ("constant", "both"), "treatment_allocation_grid": ("constant", "both"), "benefit_prevalence_grid": ("sparse_benefit", "both"), "baseline_rate_grid": ("constant", "both"), "revenue_model_grid": ("constant", "hurdle"), } # Maps each marginal grid parameter name to the template kwarg name it controls. _MARGINAL_GRID_TO_KWARG: dict[str, str] = { "severity_sigma_grid": "severity_sigma", "n_nuisance_grid": "n_nuisance", "n_visitors_grid": "n_visitors", "treatment_allocation_grid": "treatment_allocation", "benefit_prevalence_grid": "benefit_prevalence", "baseline_rate_grid": "baseline_rate", "revenue_model_grid": "revenue_model", }
[docs] def build_catalog( *, templates: list[str] | None = None, metric_ids: list[str] | None = None, effect_scale_grid: list[float], # Optional marginal sweep grids severity_sigma_grid: list[float] | None = None, n_nuisance_grid: list[int] | None = None, n_visitors_grid: list[int] | None = None, treatment_allocation_grid: list[float] | None = None, benefit_prevalence_grid: list[float] | None = None, baseline_rate_grid: list[float] | None = None, revenue_model_grid: list[Any] | None = None, channel_mode_grid: list[str] | None = None, # Defaults for non-swept dimensions default_effect_scale: float = 1.0, default_n_visitors: int = 1000, default_severity_sigma: float = 0.5, default_n_nuisance: int = 5, default_treatment_allocation: float = 0.5, default_benefit_prevalence: float = 0.15, default_baseline_rate: float = 0.05, # Driver coverage categorical_driver: str | None = None, ) -> list[ScenarioSpec]: """Build a star-sweep catalog of ``(scenario_fn, kwargs)`` tuples. Parameters ---------- templates Template names to include. Defaults to all 7. metric_ids Metric IDs to include. Defaults to both ``"conversion_rate"`` and ``"revenue_per_visitor"``. effect_scale_grid Required. Core grid dimension — all templates × metrics are crossed with every value in this list. severity_sigma_grid Optional marginal sweep grid for ``severity_sigma``. n_nuisance_grid Optional marginal sweep grid for ``n_nuisance``. n_visitors_grid Optional marginal sweep grid for ``n_visitors``. treatment_allocation_grid Optional marginal sweep grid for ``treatment_allocation``. benefit_prevalence_grid Optional marginal sweep grid for ``benefit_prevalence``. baseline_rate_grid Optional marginal sweep grid for ``baseline_rate``. revenue_model_grid Optional marginal sweep grid for revenue-model variants. channel_mode_grid For hurdle metrics, crosses channel_mode into the core grid. Defaults to ``["mirror"]`` for backward compatibility. default_effect_scale ``effect_scale`` used for marginal sweeps (default 1.0). default_n_visitors Default visitor count for non-swept dimensions. default_severity_sigma Default severity sigma for non-swept dimensions. default_n_nuisance Default nuisance-feature count for non-swept dimensions. default_treatment_allocation Default treatment allocation for non-swept dimensions. default_benefit_prevalence Default benefit prevalence for non-swept dimensions. default_baseline_rate Default baseline rate for non-swept dimensions. categorical_driver When provided, one additional ``reversal`` spec with this driver is appended per metric family (binary and hurdle) before the coverage gate is validated. Must be a categorical or binary feature from the default schema (e.g. ``"is_returning"``). Named profiles set this to ``"is_returning"`` so they satisfy the coverage gate by construction. Returns ------- list of ScenarioSpec List of ``(scenario_fn, kwargs)`` tuples consumable by ``run_suite_parallel``. The kwargs dicts do NOT include ``seed``. Raises ------ ValueError If the assembled spec list does not include at least one continuous-driver and one categorical-driver (binary counts as categorical) scenario per metric family. """ if templates is None: templates = list(TEMPLATES.keys()) if metric_ids is None: metric_ids = ["conversion_rate", "revenue_per_visitor"] _channel_mode_grid = channel_mode_grid if channel_mode_grid is not None else ["mirror"] # Base kwargs shared by all entries (defaults for non-swept dimensions). _base_kwargs: dict[str, Any] = { "n_visitors": default_n_visitors, "severity_sigma": default_severity_sigma, "n_nuisance": default_n_nuisance, "treatment_allocation": default_treatment_allocation, "benefit_prevalence": default_benefit_prevalence, "baseline_rate": default_baseline_rate, } specs: list[ScenarioSpec] = [] # ------------------------------------------------------------------ # Core grid: template × metric_id × effect_scale # For hurdle metrics, also cross with channel_mode_grid. # ------------------------------------------------------------------ for tmpl_name in templates: tmpl_fn = TEMPLATES[tmpl_name] for metric_id in metric_ids: for es in effect_scale_grid: if metric_id in _HURDLE_METRICS: # Cross with channel_mode_grid for hurdle. for cm in _channel_mode_grid: kwargs: dict[str, Any] = { **_base_kwargs, "metric_id": metric_id, "effect_scale": es, "channel_mode": cm, } specs.append((tmpl_fn, kwargs)) else: kwargs = { **_base_kwargs, "metric_id": metric_id, "effect_scale": es, } specs.append((tmpl_fn, kwargs)) # ------------------------------------------------------------------ # Marginal sweeps: one dimension at a time, against relevant ref. # ------------------------------------------------------------------ marginal_grids: dict[str, list[Any] | None] = { "severity_sigma_grid": severity_sigma_grid, "n_nuisance_grid": n_nuisance_grid, "n_visitors_grid": n_visitors_grid, "treatment_allocation_grid": treatment_allocation_grid, "benefit_prevalence_grid": benefit_prevalence_grid, "baseline_rate_grid": baseline_rate_grid, "revenue_model_grid": revenue_model_grid, } for grid_name, grid_values in marginal_grids.items(): if grid_values is None: continue ref_tmpl_name, ref_metric_scope = _MARGINAL_SWEEP_REF[grid_name] kwarg_name = _MARGINAL_GRID_TO_KWARG[grid_name] # Determine which metrics this sweep runs against. if ref_metric_scope == "hurdle": ref_metrics = [m for m in metric_ids if m in _HURDLE_METRICS] else: ref_metrics = list(metric_ids) # Skip if the reference template is not in the requested templates. if ref_tmpl_name not in templates: continue ref_tmpl_fn = TEMPLATES[ref_tmpl_name] for value in grid_values: # Skip if this value matches the default (already in core grid). default_val = _base_kwargs.get(kwarg_name) if default_val is not None and value == default_val: continue for metric_id in ref_metrics: kwargs = { **_base_kwargs, "metric_id": metric_id, "effect_scale": default_effect_scale, kwarg_name: value, } # For hurdle marginal sweeps, include default channel_mode. if metric_id in _HURDLE_METRICS: kwargs["channel_mode"] = _channel_mode_grid[0] specs.append((ref_tmpl_fn, kwargs)) # ------------------------------------------------------------------ # Categorical driver injection (D5): when categorical_driver is # provided, add one reversal spec per metric family with that driver. # This ensures profiles satisfy the coverage gate by construction. # ------------------------------------------------------------------ if categorical_driver is not None: # Validate the categorical driver against the default sampler. _cat_sampler = _build_default_mixture_sampler(default_n_nuisance) cat_info = _resolve_driver(categorical_driver, _cat_sampler) if cat_info.kind == "continuous": raise ValueError( f"build_catalog: categorical_driver={categorical_driver!r} resolved as " f"continuous. Must be a categorical or binary feature." ) assert cat_info.levels is not None # non-continuous drivers carry levels cat_target = cat_info.levels[0] for metric_id in metric_ids: cat_kwargs: dict[str, Any] = { **_base_kwargs, "metric_id": metric_id, "effect_scale": default_effect_scale, "driver": categorical_driver, "driver_target_level": cat_target, } if metric_id in _HURDLE_METRICS: cat_kwargs["channel_mode"] = _channel_mode_grid[0] specs.append((_template_reversal, cat_kwargs)) # ------------------------------------------------------------------ # Coverage gate (D5): validate that the assembled specs include at # least one continuous-driver and one categorical-driver (binary counts # as categorical) scenario per metric family. # ------------------------------------------------------------------ _coverage: dict[str, set[str]] = { "binary": set(), # metric family: metric_id NOT in _HURDLE_METRICS "hurdle": set(), # metric family: metric_id IN _HURDLE_METRICS } _default_sampler = _build_default_mixture_sampler(default_n_nuisance) for fn, kw in specs: metric_id = kw.get("metric_id", "") family = "hurdle" if metric_id in _HURDLE_METRICS else "binary" # Determine effective driver and kind. explicit_driver = kw.get("driver") if explicit_driver is not None: _drv_info = _resolve_driver(explicit_driver, _default_sampler) driver_coverage_kind = "categorical" if _drv_info.kind == "categorical" else "continuous" _coverage[family].add(driver_coverage_kind) elif fn is _template_constant: # constant has no driver — excluded from coverage check. pass elif fn is _template_clustered: # clustered uses session_recency internally → continuous. _coverage[family].add("continuous") else: # All other templates default to session_recency (continuous). _coverage[family].add("continuous") # Only validate metric families that are actually present in the specs. _families_present = { ("hurdle" if kw.get("metric_id", "") in _HURDLE_METRICS else "binary") for _, kw in specs } for family in _families_present: missing = {"continuous", "categorical"} - _coverage[family] if missing: raise ValueError( f"build_catalog: insufficient driver coverage for {family!r} metric family. " f"Missing driver kinds: {sorted(missing)}. " "The catalog must include at least one continuous-driver and one " "categorical-driver (binary counts as categorical) scenario per metric family. " "Use categorical_driver='is_returning' or include templates with categorical drivers." ) return specs
# --------------------------------------------------------------------------- # Named profiles — preset catalog configurations for build_catalog() # --------------------------------------------------------------------------- #: Default 3-category cart config for the thorough profile's revenue_model_grid. _DEFAULT_CART_CONFIG = CartRevenueConfig( categories=[ ProductCategory(name="budget", base_price=15.0, price_std=3.0, base_purchase_prob=0.6), ProductCategory(name="mid", base_price=45.0, price_std=8.0, base_purchase_prob=0.3), ProductCategory(name="premium", base_price=120.0, price_std=20.0, base_purchase_prob=0.1), ] ) PROFILES: dict[str, dict] = { # ------------------------------------------------------------------ # validate — minimal smoke test: 1 effect_scale, no marginals, small n. # Core grid: 7 templates × 2 metrics × 1 es = 14 binary + 14 hurdle # = 14 binary + 14 hurdle (mirror) = 28 total. # categorical_driver ensures the coverage gate is satisfied by construction. # ------------------------------------------------------------------ "validate": { "effect_scale_grid": [1.0], "default_n_visitors": 200, "categorical_driver": "is_returning", }, # ------------------------------------------------------------------ # standard — routine development calibration, ~50 configs. # Core: 7 × 2 × 3 es = 42 (binary) + 42 (hurdle) = 84 core. # Marginals add ~14–20 more via severity_sigma, n_nuisance, n_visitors, # and baseline_rate_grid sweeps. # categorical_driver ensures the coverage gate is satisfied by construction. # ------------------------------------------------------------------ "standard": { "effect_scale_grid": [0.5, 1.0, 2.0], "severity_sigma_grid": [0.3, 1.0], "n_nuisance_grid": [0, 10], "n_visitors_grid": [500, 2000], "baseline_rate_grid": [0.02, 0.05, 0.15], "default_n_visitors": 1000, "categorical_driver": "is_returning", }, # ------------------------------------------------------------------ # thorough — pre-release dense coverage, ~150 configs. # Denser effect_scale grid + all marginal dimensions including # channel_mode_grid, treatment_allocation, benefit_prevalence, # and revenue_model_grid. # categorical_driver ensures the coverage gate is satisfied by construction. # ------------------------------------------------------------------ "thorough": { "effect_scale_grid": [0.25, 0.5, 1.0, 2.0, 4.0], "severity_sigma_grid": [0.2, 0.5, 1.0, 2.0], "n_nuisance_grid": [0, 5, 15], "n_visitors_grid": [250, 500, 1000, 2000, 5000], "treatment_allocation_grid": [0.3, 0.5, 0.7], "benefit_prevalence_grid": [0.05, 0.15, 0.30], "baseline_rate_grid": [0.02, 0.05, 0.10, 0.20], "revenue_model_grid": ["lognormal", _DEFAULT_CART_CONFIG], "channel_mode_grid": ["mirror", "opposing", "conversion_only", "severity_only"], "default_n_visitors": 1000, "categorical_driver": "is_returning", }, }