Source code for pytyche.generators.core

"""V2 generator core — single potential-outcomes pipeline.

Public API
----------
    generate_v2_core(config: V2GeneratorConfig) -> CalibrationBundle

Pipeline stages (filled in by subsequent tasks):
    1. sample_features(config, rng) -> X: pd.DataFrame
    2. compute_potential_outcomes(X, params) -> truth arrays
    3. assign_and_sample(X, truth, assignment, rng) -> observed outcomes
    4. build_bundle(observed, truth) -> CalibrationBundle

Config design
-------------
Typed frozen dataclasses with __post_init__ fail-closed validation.
Feature sampler is a discriminated union: CopulaConfig | MixtureConfig.
Both strategies produce the same output shape (X: pd.DataFrame).
Surfaces are Callable[[pd.DataFrame], np.ndarray].
"""

from __future__ import annotations

import dataclasses
from collections.abc import Callable
from typing import Any, Literal

import numpy as np
import pandas as pd
from scipy import stats

from pytyche.contracts import (
    AlignedVisitorArray,
    CalibrationBundle,
    CalibrationTruth,
    CartRevenueConfig,
    MetricFamily,
    ObservedExperimentData,
    VariantData,
)
from pytyche.validation import validate_alignment, validate_observed_data

# ---------------------------------------------------------------------------
# Feature spec
# ---------------------------------------------------------------------------

FeatureKind = Literal["continuous", "categorical", "binary"]

_VALID_FEATURE_KINDS: frozenset[str] = frozenset(
    {"continuous", "categorical", "binary"}
)


[docs] @dataclasses.dataclass(frozen=True) class FeatureSpec: """Specification for a single feature column. Fields: name: Column name. Must be non-empty. kind: Feature type — ``"continuous"``, ``"categorical"``, or ``"binary"``. """ name: str kind: FeatureKind def __post_init__(self) -> None: if not self.name: raise ValueError("FeatureSpec: name must be non-empty") if self.kind not in _VALID_FEATURE_KINDS: raise ValueError( f"FeatureSpec: kind must be one of {sorted(_VALID_FEATURE_KINDS)}, " f"got {self.kind!r}" )
# --------------------------------------------------------------------------- # Feature sampler configs # ---------------------------------------------------------------------------
[docs] @dataclasses.dataclass(frozen=True) class CopulaConfig: """Gaussian copula feature sampler configuration. Generates correlated mixed covariates via multivariate normal with configured correlation matrix and marginal transforms. Fields: features: Ordered feature specifications. Length determines the dimension of the copula. correlation: Square correlation matrix of shape ``(len(features), len(features))``. Must be positive semi-definite. """ features: tuple[FeatureSpec, ...] correlation: np.ndarray def __post_init__(self) -> None: n = len(self.features) if self.correlation.shape != (n, n): raise ValueError( f"CopulaConfig: correlation shape {self.correlation.shape} " f"must match (len(features), len(features)) = ({n}, {n})" ) # Validate correlation matrix properties. if not np.allclose(self.correlation, self.correlation.T, atol=1e-12): raise ValueError( "CopulaConfig: correlation matrix must be symmetric" ) if not np.allclose(np.diag(self.correlation), 1.0, atol=1e-12): raise ValueError( "CopulaConfig: correlation matrix diagonal must be all 1.0" ) if np.linalg.eigvalsh(self.correlation).min() < -1e-10: raise ValueError( "CopulaConfig: correlation matrix must be positive semi-definite" ) corr_copy = self.correlation.copy() corr_copy.flags.writeable = False object.__setattr__(self, "correlation", corr_copy)
[docs] @dataclasses.dataclass(frozen=True) class MixtureConfig: """Mixture-of-populations feature sampler configuration. Generates covariates by sampling latent cluster membership and then drawing from cluster-conditional feature distributions. Fields: features: Feature specifications shared across all clusters. weights: Per-cluster mixture weights. All must be strictly positive. Length must equal ``len(cluster_params)``. cluster_params: Per-cluster parameter dicts keyed by feature name (or by arbitrary convention per stage implementation). """ features: tuple[FeatureSpec, ...] weights: tuple[float, ...] cluster_params: tuple[dict[str, Any], ...] def __post_init__(self) -> None: if any(w <= 0 for w in self.weights): raise ValueError( "MixtureConfig: all weight values must be strictly positive" ) if len(self.weights) != len(self.cluster_params): raise ValueError( f"MixtureConfig: len(weights)={len(self.weights)} must equal " f"len(cluster_params)={len(self.cluster_params)}" )
#: Discriminated union of feature sampler strategy configs. FeatureSamplerConfig = CopulaConfig | MixtureConfig # --------------------------------------------------------------------------- # Surface config # ---------------------------------------------------------------------------
[docs] @dataclasses.dataclass(frozen=True) class SurfaceConfig: """HTE surface definition as a callable. The callable receives the feature DataFrame ``X`` (one row per visitor) and returns a 1-D numpy array of per-visitor values (e.g. conversion probabilities, severity means). Fields: fn: ``(X: pd.DataFrame) -> np.ndarray`` — must return a 1-D array of length ``len(X)``. """ fn: Callable[[pd.DataFrame], np.ndarray]
# --------------------------------------------------------------------------- # Assignment config # ---------------------------------------------------------------------------
[docs] @dataclasses.dataclass(frozen=True) class AssignmentConfig: """Fixed (forced) treatment-assignment policy. Configures the FIXED/forced assignment used for simple randomized designs and SBC — every visitor's treatment is drawn from a feature-independent distribution (binary 50/50 or a uniform/fixed per-treatment vector). Policy-routed assignment — where a sequential experiment's cell routes each visitor to a treatment based on their features — is NOT encoded here; it is supplied externally at observation time. This config is just one assignment policy (the forced/randomized one). **Binary (paired) form:** set ``treatment_allocation`` (fraction in (0, 1)); leave ``treatment_probabilities`` as ``None``. **Multi-treatment form:** set ``treatment_probabilities`` to a length-K tuple of positive fractions summing to 1.0 (within 1e-9) — the fixed per-treatment assignment probabilities (uniform = balanced forced exploration). ``treatment_allocation`` is unused in this form but must still be in (0, 1). Fields: treatment_allocation: Fraction of visitors assigned to treatment in the binary form. Must be in the open interval (0, 1). Default 0.5. treatment_probabilities: Fixed per-treatment assignment probabilities for K≥2 (the forced/randomized policy). ``None`` selects the binary paired form. Length must be ≥ 2, every entry strictly in (0, 1), and entries must sum to 1.0 (±1e-9). """ treatment_allocation: float = 0.5 treatment_probabilities: tuple[float, ...] | None = dataclasses.field(default=None, kw_only=True) @property def n_treatments(self) -> int: return 2 if self.treatment_probabilities is None else len(self.treatment_probabilities) def __post_init__(self) -> None: if not (0.0 < self.treatment_allocation < 1.0): raise ValueError( f"AssignmentConfig: treatment_allocation must be in (0, 1), " f"got {self.treatment_allocation}" ) if self.treatment_probabilities is not None: if len(self.treatment_probabilities) < 2: raise ValueError( f"AssignmentConfig: treatment_probabilities must have at least 2 entries, " f"got {len(self.treatment_probabilities)}" ) if not all(0.0 < v < 1.0 for v in self.treatment_probabilities): raise ValueError( "AssignmentConfig: every entry in treatment_probabilities must be in (0, 1)" ) if abs(sum(self.treatment_probabilities) - 1.0) > 1e-9: raise ValueError( f"AssignmentConfig: treatment_probabilities must sum to 1, " f"got {sum(self.treatment_probabilities)}" )
# --------------------------------------------------------------------------- # Metric mode # --------------------------------------------------------------------------- _VALID_METRIC_IDS: frozenset[str] = frozenset( {"conversion_rate", "revenue_per_visitor"} )
[docs] @dataclasses.dataclass(frozen=True) class MetricMode: """Metric-specific potential outcome surfaces. **K=2 (paired) form:** populate the six paired surface fields (``p0_surface``, ``p1_surface``, and for hurdle metrics ``m0/m1/sigma0/sigma1``); leave ``p_surfaces``, ``m_surfaces``, and ``sigma_surfaces`` as ``None``. **Multi-treatment form:** leave all six paired surface fields as ``None`` and populate ``p_surfaces`` (length K) and, for hurdle metrics, ``m_surfaces`` and ``sigma_surfaces`` (also length K). K must be ≥ 3 — the K = 2 case uses the paired form (the truth-computation dispatch routes list-form surfaces through the multi-treatment path only at K ≥ 3). Every surface is a function of the individual user's attributes ``X``. A ``SurfaceConfig`` carries the potential-outcome surface for ONE treatment level: ``p_surfaces[k](X)`` is the conversion potential outcome a user with features ``X`` WOULD show under treatment level ``k`` (level 0 = control, the prognostic baseline; levels 1…K−1 = treated). The surfaces are not properties of an assignment cohort — assignment (randomization, and in a sequential experiment a cell's policy) merely selects which of a user's K potential outcomes is observed. For binary metrics (``"conversion_rate"``): only conversion probability surfaces are needed; severity/sigma surfaces must be ``None``. For hurdle metrics (``"revenue_per_visitor"``): conversion, severity, and sigma surfaces are all required. Fields: metric_id: Canonical metric name. Must be a known metric. p0_surface: Control (level-0) conversion potential-outcome surface (paired form). p1_surface: Treated (level-1) conversion potential-outcome surface (paired form). m0_surface: Control severity (AOV) surface. Hurdle paired form only. m1_surface: Treated severity (AOV) surface. Hurdle paired form only. sigma0_surface: Control LogNormal dispersion surface. Hurdle paired only. sigma1_surface: Treated LogNormal dispersion surface. Hurdle paired only. p_surfaces: Conversion potential-outcome surfaces, one per treatment level (length K, index 0 = control). Each is a function of user attributes ``X``. m_surfaces: Severity potential-outcome surfaces, one per treatment level (length K). Hurdle multi-treatment form. sigma_surfaces: LogNormal dispersion surfaces, one per treatment level (length K). Hurdle multi-treatment form. """ metric_id: str p0_surface: SurfaceConfig | None = None p1_surface: SurfaceConfig | None = None m0_surface: SurfaceConfig | None = None m1_surface: SurfaceConfig | None = None sigma0_surface: SurfaceConfig | None = None sigma1_surface: SurfaceConfig | None = None p_surfaces: list[SurfaceConfig] | None = dataclasses.field(default=None, kw_only=True) m_surfaces: list[SurfaceConfig] | None = dataclasses.field(default=None, kw_only=True) sigma_surfaces: list[SurfaceConfig] | None = dataclasses.field(default=None, kw_only=True) @property def n_treatments(self) -> int: return 2 if self.p_surfaces is None else len(self.p_surfaces) def __post_init__(self) -> None: if self.p_surfaces is not None: # multi-treatment (K>=3) list form if ( self.p0_surface is not None or self.p1_surface is not None or self.m0_surface is not None or self.m1_surface is not None or self.sigma0_surface is not None or self.sigma1_surface is not None ): raise ValueError( "MetricMode: cannot mix paired and multi-treatment surface forms" ) if len(self.p_surfaces) < 3: raise ValueError( f"MetricMode: the list surface form is for K >= 3 " f"(got {len(self.p_surfaces)} entries); use the paired " "p0/p1 surface form for K = 2" ) if self.metric_id not in _VALID_METRIC_IDS: raise ValueError( f"MetricMode: metric_id must be one of " f"{sorted(_VALID_METRIC_IDS)}, got {self.metric_id!r}" ) if self.metric_id == "conversion_rate": if self.m_surfaces is not None or self.sigma_surfaces is not None: raise ValueError( "MetricMode: conversion_rate must not have hurdle surfaces " "(m_surfaces and sigma_surfaces must be None)" ) if self.metric_id == "revenue_per_visitor": if self.m_surfaces is None or self.sigma_surfaces is None: raise ValueError( "MetricMode: revenue_per_visitor requires m_surfaces and sigma_surfaces" ) if ( len(self.m_surfaces) != len(self.p_surfaces) or len(self.sigma_surfaces) != len(self.p_surfaces) ): raise ValueError( "MetricMode: m_surfaces and sigma_surfaces must have the same " "length as p_surfaces" ) else: # Paired (K=2) form — run existing validation exactly as before if self.p0_surface is None or self.p1_surface is None: raise ValueError( "MetricMode: paired form requires p0_surface and p1_surface" ) if self.metric_id not in _VALID_METRIC_IDS: raise ValueError( f"MetricMode: metric_id must be one of " f"{sorted(_VALID_METRIC_IDS)}, got {self.metric_id!r}" ) if self.metric_id == "conversion_rate": if self.m0_surface is not None or self.m1_surface is not None: raise ValueError( "MetricMode: conversion_rate must not have hurdle surfaces " "(m0_surface and m1_surface must be None)" ) if self.sigma0_surface is not None or self.sigma1_surface is not None: raise ValueError( "MetricMode: conversion_rate must not have sigma surfaces " "(sigma0_surface and sigma1_surface must be None)" ) if self.metric_id == "revenue_per_visitor": if self.m0_surface is None or self.m1_surface is None: raise ValueError( "MetricMode: revenue_per_visitor requires both hurdle surfaces " "(m0_surface and m1_surface must not be None)" ) if self.sigma0_surface is None or self.sigma1_surface is None: raise ValueError( "MetricMode: revenue_per_visitor requires both sigma surfaces " "(sigma0_surface and sigma1_surface must not be None)" )
# --------------------------------------------------------------------------- # Top-level generator config # ---------------------------------------------------------------------------
[docs] @dataclasses.dataclass(frozen=True) class V2GeneratorConfig: """Top-level configuration for the v2 core generator pipeline. Fields: n_visitors: Total visitor count across all arms. Must be positive. feature_sampler: Feature sampling strategy config (``CopulaConfig`` or ``MixtureConfig``). metric_mode: Metric and potential-outcome surface definitions. assignment: Treatment assignment parameters. seed: Random seed for full pipeline reproducibility. experiment_id: Identifier for the generated experiment. Default ``"sim-v2"``. revenue_model: Revenue sampling strategy for hurdle metrics. ``None`` (default) uses LogNormal sampling (existing behavior). A ``CartRevenueConfig`` activates cart-based revenue sampling — per-category Bernoulli draws whose prices sum to converter revenue. Ignored for binary (``conversion_rate``) metrics. """ n_visitors: int feature_sampler: FeatureSamplerConfig metric_mode: MetricMode assignment: AssignmentConfig seed: int experiment_id: str = "sim-v2" revenue_model: CartRevenueConfig | None = None def __post_init__(self) -> None: if self.n_visitors <= 0: raise ValueError( f"V2GeneratorConfig: n_visitors must be positive, " f"got {self.n_visitors}" )
# --------------------------------------------------------------------------- # Feature sampling # --------------------------------------------------------------------------- # Number of discrete categories for "categorical" features in the copula # strategy. Copula draws are uniform; we threshold into equal-probability bins. _COPULA_N_CATEGORIES: int = 4 def _sample_copula( config: CopulaConfig, rng: np.random.Generator, n: int ) -> pd.DataFrame: """Sample features via Gaussian copula with marginal transforms. Algorithm: 1. Draw ``n`` samples from a zero-mean multivariate normal with the configured correlation matrix. 2. Convert to uniform marginals via the standard normal CDF. 3. Apply per-feature inverse transforms based on ``FeatureKind``: - continuous: keep the raw normal values (standard-normal marginals). - categorical: threshold uniform quantiles into ``_COPULA_N_CATEGORIES`` equal-probability bins labelled as strings. - binary: threshold the raw normal value at 0 → bool. """ d = len(config.features) # Step 1: draw from zero-mean MVN with the configured correlation matrix. z = rng.multivariate_normal(mean=np.zeros(d), cov=config.correlation, size=n) # Step 2: convert to uniform marginals for categorical thresholding. u = stats.norm.cdf(z) # shape (n, d), values in (0, 1) columns: dict[str, object] = {} for j, feat in enumerate(config.features): if feat.kind == "continuous": columns[feat.name] = z[:, j].astype(float) elif feat.kind == "categorical": # Partition (0, 1] into equal bins; label as "cat_0", "cat_1", … bins = np.linspace(0.0, 1.0, _COPULA_N_CATEGORIES + 1) indices = np.digitize(u[:, j], bins[1:], right=True) # np.digitize with right=True and bins[1:] gives 0-based indices. label_names = [f"cat_{i}" for i in range(_COPULA_N_CATEGORIES)] raw = [label_names[i] for i in np.clip(indices, 0, _COPULA_N_CATEGORIES - 1)] # Use pd.Categorical so the dtype is CategoricalDtype (not StringDtype). columns[feat.name] = pd.Categorical(raw, categories=label_names) elif feat.kind == "binary": columns[feat.name] = (z[:, j] > 0.0) else: raise ValueError( f"sample_features: unknown feature kind {feat.kind!r} " f"for feature {feat.name!r}" ) return pd.DataFrame(columns) def _sample_mixture( config: MixtureConfig, rng: np.random.Generator, n: int ) -> pd.DataFrame: """Sample features via mixture-of-populations with cluster-conditional distributions. Algorithm: 1. Normalize ``config.weights`` to sum to 1. 2. Assign each of ``n`` visitors to a cluster using ``rng.choice`` weighted by the normalized weights. 3. For each visitor, sample features from the cluster-conditional parameters: - continuous: ``{name}_mean`` and ``{name}_std`` from ``cluster_params``. - categorical: ``{name}_probs`` dict (label -> probability) from ``cluster_params``. Labels are drawn proportionally. - binary: ``{name}_prob`` from ``cluster_params``. 4. Include a ``cluster_id`` column (integer cluster index) in the output. """ weights = np.array(config.weights, dtype=float) normalized_weights = weights / weights.sum() n_clusters = len(config.cluster_params) # Assign each visitor to a cluster. cluster_assignments: np.ndarray = rng.choice(n_clusters, size=n, p=normalized_weights) columns: dict[str, object] = {} for feat in config.features: if feat.kind == "continuous": values = np.empty(n, dtype=float) for cluster_idx in range(n_clusters): mask = cluster_assignments == cluster_idx if not np.any(mask): continue params = config.cluster_params[cluster_idx] mean = float(params[f"{feat.name}_mean"]) std = float(params[f"{feat.name}_std"]) values[mask] = rng.normal(loc=mean, scale=std, size=int(mask.sum())) columns[feat.name] = values elif feat.kind == "categorical": values_arr = np.empty(n, dtype=object) for cluster_idx in range(n_clusters): mask = cluster_assignments == cluster_idx count = int(mask.sum()) if count == 0: continue params = config.cluster_params[cluster_idx] probs_dict: dict[str, float] = params[f"{feat.name}_probs"] labels = list(probs_dict.keys()) probs = np.array(list(probs_dict.values()), dtype=float) probs = probs / probs.sum() chosen = rng.choice(labels, size=count, p=probs) values_arr[mask] = chosen # Use pd.Categorical so the dtype is CategoricalDtype. all_labels: list[str] = [] for params in config.cluster_params: for label in params[f"{feat.name}_probs"].keys(): if label not in all_labels: all_labels.append(label) columns[feat.name] = pd.Categorical(values_arr, categories=all_labels) elif feat.kind == "binary": values_bool = np.empty(n, dtype=bool) for cluster_idx in range(n_clusters): mask = cluster_assignments == cluster_idx count = int(mask.sum()) if count == 0: continue params = config.cluster_params[cluster_idx] prob = float(params[f"{feat.name}_prob"]) values_bool[mask] = rng.random(size=count) < prob columns[feat.name] = values_bool else: raise ValueError( f"_sample_mixture: unknown feature kind {feat.kind!r} " f"for feature {feat.name!r}" ) columns["cluster_id"] = cluster_assignments return pd.DataFrame(columns)
[docs] def sample_features( config: FeatureSamplerConfig, rng: np.random.Generator, n: int ) -> pd.DataFrame: """Sample a feature matrix from the given sampler config. Parameters ---------- config: Either a ``CopulaConfig`` (Gaussian copula strategy) or a ``MixtureConfig`` (mixture-of-populations strategy). rng: Explicit RNG instance — caller controls seeding for reproducibility. n: Number of rows to sample. Returns ------- pd.DataFrame Shape ``(n, len(config.features))``. Column names and dtypes follow the feature specs in ``config.features``. For ``MixtureConfig`` an additional ``cluster_id`` column is included (used internally for per-cluster surface dispatch; stripped before output by ``generate_v2_core``). Raises ------ TypeError Raised for unsupported config types. ValueError Raised for unknown feature kinds within a config. """ if isinstance(config, CopulaConfig): return _sample_copula(config, rng, n) elif isinstance(config, MixtureConfig): return _sample_mixture(config, rng, n) else: raise TypeError( f"sample_features: unsupported config type {type(config).__name__!r}" )
# --------------------------------------------------------------------------- # Potential outcomes — truth computation # ---------------------------------------------------------------------------
[docs] @dataclasses.dataclass(frozen=True) class TruthResult: """Internal intermediate result from compute_potential_outcomes. Holds per-visitor CATE arrays and population summary statistics before assembly into a CalibrationBundle. This is not part of the public contract — build_bundle (task 7.2) constructs CalibrationTruth from this. **K-dispatch:** K=2 (paired form): populates the legacy paired ``*_values`` fields (``p0_values``, ``p1_values``, and for hurdle ``m0/m1/sigma0/sigma1_values``) plus ``cate_per_visitor``; leaves ``p_values``, ``m_values``, ``sigma_values`` as ``None``. K≥3 (multi-treatment form): populates ``p_values`` (length K), and for hurdle ``m_values`` / ``sigma_values`` (also length K); sets ``cate_per_visitor=None`` and leaves all six legacy paired ``*_values`` fields as ``None``. Fields: cate_per_visitor: Per-visitor CATE, aligned 1:1 with input rows. Populated at K=2; ``None`` at K≥3. effect: Population mean CATE (mean of cate_per_visitor.values at K=2; mean of the best-treatment contrast at K≥3 — provisional scalar). effect_components: Named decomposition of the population effect. Binary K=2: ``{"conv_effect": float}`` Hurdle K=2: ``{"conv_effect": float, "aov_effect": float}`` K≥3: ``{"treatment_1_effect": float, ..., "treatment_{K-1}_effect": float}`` conv_per_visitor: Per-visitor conversion component. Only populated for hurdle K=2 metrics; ``None`` for binary or K≥3. aov_per_visitor: Per-visitor AOV component. Only populated for hurdle K=2 metrics; ``None`` for binary or K≥3. p0_values: Per-visitor control conversion probabilities. Populated by both binary and hurdle K=2 computations for use in outcome sampling. p1_values: Per-visitor treatment conversion probabilities. K=2 only. m0_values: Per-visitor control severity means (raw surface output). Hurdle K=2 only; ``None`` for binary or K≥3. m1_values: Per-visitor treatment severity means (raw surface output). Hurdle K=2 only; ``None`` for binary or K≥3. m0_effective: Per-visitor control expected revenue conditional on conversion. For lognormal, equals ``m0_values``. For cart, this is the analytical expected cart revenue. ``None`` for binary or K≥3. m1_effective: Per-visitor treatment expected revenue conditional on conversion. For lognormal, equals ``m1_values``. For cart, this is the analytical expected cart revenue. ``None`` for binary or K≥3. sigma0_values: Per-visitor control LogNormal dispersion. Hurdle K=2 only; ``None`` for binary or K≥3. sigma1_values: Per-visitor treatment LogNormal dispersion. Hurdle K=2 only; ``None`` for binary or K≥3. p_values: Per-visitor conversion potential outcomes, one array per treatment level (length K, index 0 = control). K≥3 only; ``None`` at K=2. m_values: Per-visitor severity potential outcomes (length K). K≥3 hurdle only; ``None`` for K=2 or conversion_rate. sigma_values: Per-visitor LogNormal dispersion (length K). K≥3 hurdle only; ``None`` for K=2 or conversion_rate. """ cate_per_visitor: AlignedVisitorArray | None effect: float effect_components: dict[str, float] conv_per_visitor: AlignedVisitorArray | None = None aov_per_visitor: AlignedVisitorArray | None = None p0_values: np.ndarray | None = None p1_values: np.ndarray | None = None m0_values: np.ndarray | None = None m1_values: np.ndarray | None = None m0_effective: np.ndarray | None = None m1_effective: np.ndarray | None = None sigma0_values: np.ndarray | None = None sigma1_values: np.ndarray | None = None p_values: list[np.ndarray] | None = None m_values: list[np.ndarray] | None = None sigma_values: list[np.ndarray] | None = None
def _validate_surface_output( values: object, n_expected: int, surface_name: str, *, lower_bound: float | None = None, upper_bound: float | None = None, strict_lower: bool = False, ) -> np.ndarray: """Validate and coerce a surface output to a well-formed 1-D ndarray. Checks: 1. Coerce to ndarray via ``np.asarray``. 2. Must be 1-D. 3. Length must equal ``n_expected``. 4. All values must be finite (no NaN or Inf). 5. Optional domain bounds (inclusive or strict-lower). Returns the validated ndarray. Raises ------ ValueError On any violation — with a message naming the surface. """ arr = np.asarray(values, dtype=float) if arr.ndim != 1: raise ValueError( f"Surface '{surface_name}': expected 1-D array, got ndim={arr.ndim}" ) if len(arr) != n_expected: raise ValueError( f"Surface '{surface_name}': expected length {n_expected}, got {len(arr)}" ) if not np.all(np.isfinite(arr)): raise ValueError( f"Surface '{surface_name}': contains non-finite values (NaN or Inf)" ) if lower_bound is not None: if strict_lower: if not np.all(arr > lower_bound): raise ValueError( f"Surface '{surface_name}': all values must be > {lower_bound}" ) else: if not np.all(arr >= lower_bound): raise ValueError( f"Surface '{surface_name}': all values must be >= {lower_bound}" ) if upper_bound is not None: if not np.all(arr <= upper_bound): raise ValueError( f"Surface '{surface_name}': all values must be <= {upper_bound}" ) return arr def _compute_binary_truth( features: pd.DataFrame, metric_mode: MetricMode ) -> TruthResult: """Compute per-visitor CATE for a binary (conversion_rate) metric.""" assert metric_mode.p0_surface is not None and metric_mode.p1_surface is not None, ( "_compute_binary_truth called with paired-form MetricMode missing p0/p1 surfaces" ) n = len(features) p0_values = _validate_surface_output( metric_mode.p0_surface.fn(features), n, "p0", lower_bound=0.0, upper_bound=1.0 ) p1_values = _validate_surface_output( metric_mode.p1_surface.fn(features), n, "p1", lower_bound=0.0, upper_bound=1.0 ) cate = p1_values - p0_values effect = float(np.mean(cate)) return TruthResult( cate_per_visitor=AlignedVisitorArray(values=cate, n_visitors=n), effect=effect, effect_components={"conv_effect": effect}, p0_values=p0_values, p1_values=p1_values, ) def _compute_cart_expected_revenue( cart_config: CartRevenueConfig, severity_delta: np.ndarray ) -> np.ndarray: """Analytical expected cart revenue per converter given severity delta. For each visitor *i* and category *j*: purchase_prob_j(i) = sigmoid(logit(base_j) + severity_delta[i] × weight_j) Expected revenue includes the empty-cart fallback (cheapest category forced): E[rev | convert, i] = Σ_j(purchase_prob_j(i) × base_price_j) + P(empty_cart_i) × cheapest_price Parameters ---------- cart_config: Cart revenue model configuration. severity_delta: Per-visitor logit-additive shift. Shape ``(n,)``. Pass zeros for the control arm. Returns ------- np.ndarray Shape ``(n,)`` — expected revenue conditional on conversion. """ categories = cart_config.categories base_probs = np.array([c.base_purchase_prob for c in categories], dtype=float) base_prices = np.array([c.base_price for c in categories], dtype=float) price_stds = np.array([c.price_std for c in categories], dtype=float) weights = base_probs / base_probs.sum() logit_base = np.log(base_probs / (1.0 - base_probs)) # Compute expected price per category under the truncated normal sampling # distribution: X ~ N(mu, sigma) truncated below at a = mu / 2. # E[X | X > a] = mu + sigma * phi(alpha) / (1 - Phi(alpha)) # where alpha = (a - mu) / sigma. expected_prices = np.empty_like(base_prices) for j in range(len(categories)): if price_stds[j] == 0.0: expected_prices[j] = base_prices[j] else: a = base_prices[j] / 2.0 alpha = (a - base_prices[j]) / price_stds[j] expected_prices[j] = base_prices[j] + price_stds[j] * ( stats.norm.pdf(alpha) / (1.0 - stats.norm.cdf(alpha)) ) # (n_visitors, n_categories) logit_probs = ( logit_base[np.newaxis, :] + severity_delta[:, np.newaxis] * weights[np.newaxis, :] ) purchase_probs = 1.0 / (1.0 + np.exp(-logit_probs)) # sigmoid # Expected revenue from each category using truncated-normal expected prices. expected_per_category = purchase_probs * expected_prices[np.newaxis, :] # (n, n_cat) # Empty-cart probability: prod(1 - purchase_prob_j). empty_prob = np.prod(1.0 - purchase_probs, axis=1) # (n,) # Empty-cart fallback uses the truncated mean of the cheapest category's price. cheapest_idx = int(np.argmin(base_prices)) cheapest_expected_price = float(expected_prices[cheapest_idx]) return expected_per_category.sum(axis=1) + empty_prob * cheapest_expected_price def _compute_hurdle_truth( features: pd.DataFrame, metric_mode: MetricMode, revenue_model: CartRevenueConfig | None = None, ) -> TruthResult: """Compute per-visitor CATE and additive decomposition for a hurdle metric.""" assert metric_mode.p0_surface is not None and metric_mode.p1_surface is not None, ( "_compute_hurdle_truth called with paired-form MetricMode missing p0/p1 surfaces" ) n = len(features) p0 = _validate_surface_output( metric_mode.p0_surface.fn(features), n, "p0", lower_bound=0.0, upper_bound=1.0 ) p1 = _validate_surface_output( metric_mode.p1_surface.fn(features), n, "p1", lower_bound=0.0, upper_bound=1.0 ) if metric_mode.m0_surface is None or metric_mode.m1_surface is None: raise ValueError( "_compute_hurdle_truth: m0_surface and m1_surface must not be None" ) m0 = _validate_surface_output( metric_mode.m0_surface.fn(features), n, "m0", lower_bound=0.0, strict_lower=True ) m1 = _validate_surface_output( metric_mode.m1_surface.fn(features), n, "m1", lower_bound=0.0, strict_lower=True ) if metric_mode.sigma0_surface is None or metric_mode.sigma1_surface is None: raise ValueError( "_compute_hurdle_truth: sigma0_surface and sigma1_surface must not be None" ) sigma0 = _validate_surface_output( metric_mode.sigma0_surface.fn(features), n, "sigma0", lower_bound=0.0, strict_lower=True ) sigma1 = _validate_surface_output( metric_mode.sigma1_surface.fn(features), n, "sigma1", lower_bound=0.0, strict_lower=True ) # Effective "expected revenue | convert" values for the decomposition. # For lognormal, the raw surface values (m0, m1) ARE the expected revenue. # For cart, we compute analytical expected cart revenue per visitor. if revenue_model is not None: severity_delta_treat = m1 - m0 m0_eff = _compute_cart_expected_revenue( revenue_model, np.zeros(n, dtype=float) ) m1_eff = _compute_cart_expected_revenue(revenue_model, severity_delta_treat) else: m0_eff = m0 m1_eff = m1 cate = p1 * m1_eff - p0 * m0_eff conv = (p1 - p0) * m0_eff aov = p1 * (m1_eff - m0_eff) # Per-visitor decomposition identity check (ValueError, not assert). if not np.allclose(cate, conv + aov, atol=1e-10): max_err = float(np.max(np.abs(cate - (conv + aov)))) raise ValueError( f"Per-visitor decomposition identity violated: " f"max |cate - (conv + aov)| = {max_err:.2e}" ) effect = float(np.mean(cate)) return TruthResult( cate_per_visitor=AlignedVisitorArray(values=cate, n_visitors=n), effect=effect, effect_components={ "conv_effect": float(np.mean(conv)), "aov_effect": float(np.mean(aov)), }, conv_per_visitor=AlignedVisitorArray(values=conv, n_visitors=n), aov_per_visitor=AlignedVisitorArray(values=aov, n_visitors=n), p0_values=p0, p1_values=p1, m0_values=m0, m1_values=m1, m0_effective=m0_eff, m1_effective=m1_eff, sigma0_values=sigma0, sigma1_values=sigma1, ) def _compute_truth_multi( features: pd.DataFrame, metric_mode: MetricMode, revenue_model: CartRevenueConfig | None = None, ) -> TruthResult: """Compute truth for the multi-treatment (K≥3) form. Evaluates all K per-treatment-level potential-outcome surfaces and derives K−1 treatment-vs-control contrasts. Only the lognormal severity model is supported; passing a non-None ``revenue_model`` raises ``NotImplementedError``. Parameters ---------- features: Feature matrix, one row per visitor. metric_mode: Multi-treatment MetricMode (``p_surfaces`` non-None, length K≥3). revenue_model: Must be ``None``; K≥3 cart revenue is out of scope. Returns ------- TruthResult K≥3 form: ``p_values`` / ``m_values`` / ``sigma_values`` (length K); ``cate_per_visitor=None``; legacy paired fields all ``None``. """ if revenue_model is not None: raise NotImplementedError( "K>=3 cart revenue is out of scope; lognormal only" ) assert metric_mode.p_surfaces is not None, ( "_compute_truth_multi called without p_surfaces" ) n = len(features) K = metric_mode.n_treatments p_values: list[np.ndarray] = [ _validate_surface_output( metric_mode.p_surfaces[k].fn(features), n, f"p{k}", lower_bound=0.0, upper_bound=1.0, ) for k in range(K) ] if metric_mode.metric_id == "revenue_per_visitor": assert metric_mode.m_surfaces is not None and metric_mode.sigma_surfaces is not None m_values: list[np.ndarray] | None = [ _validate_surface_output( metric_mode.m_surfaces[k].fn(features), n, f"m{k}", lower_bound=0.0, strict_lower=True, ) for k in range(K) ] sigma_values: list[np.ndarray] | None = [ _validate_surface_output( metric_mode.sigma_surfaces[k].fn(features), n, f"sigma{k}", lower_bound=0.0, strict_lower=True, ) for k in range(K) ] # K−1 per-treatment contrasts: treatment_j vs. control (index 0). effect_components: dict[str, float] = { f"treatment_{j + 1}_effect": float( np.mean(p_values[j + 1] * m_values[j + 1] - p_values[0] * m_values[0]) ) for j in range(K - 1) } else: # conversion_rate — no severity surfaces. m_values = None sigma_values = None effect_components = { f"treatment_{j + 1}_effect": float( np.mean(p_values[j + 1] - p_values[0]) ) for j in range(K - 1) } # Provisional best-treatment scalar: max over the K−1 contrasts. # This is a provisional decision-relevant summary; the K≥3 SBC operates # per-contrast (Phase 12) rather than collapsing to one scalar. effect = float(max(effect_components.values())) return TruthResult( cate_per_visitor=None, effect=effect, effect_components=effect_components, p_values=p_values, m_values=m_values, sigma_values=sigma_values, )
[docs] def compute_potential_outcomes( features: pd.DataFrame, metric_mode: MetricMode, revenue_model: CartRevenueConfig | None = None, ) -> TruthResult: """Compute per-visitor potential outcomes and derive truth arrays. Dispatches based on ``metric_mode.n_treatments``: - ``n_treatments == 2`` (paired form): routes to ``_compute_binary_truth`` or ``_compute_hurdle_truth`` exactly as before. - ``n_treatments >= 3`` (multi-treatment form): routes to ``_compute_truth_multi``. Parameters ---------- features: Feature matrix, one row per visitor. Produced by ``sample_features``. metric_mode: Metric and surface definitions. Determines which computation path to use (binary, hurdle, or multi-treatment). revenue_model: Cart revenue configuration. When provided, the hurdle decomposition uses analytical expected cart revenue instead of the raw severity surface values. Ignored for binary metrics. Not supported for K≥3. Returns ------- TruthResult Per-visitor CATE, population effect, and decomposition components. Raises ------ ValueError If ``metric_mode.metric_id`` is not a supported metric. NotImplementedError If ``revenue_model`` is non-None and ``n_treatments >= 3``. """ if metric_mode.n_treatments >= 3: return _compute_truth_multi(features, metric_mode, revenue_model) if metric_mode.metric_id == "conversion_rate": return _compute_binary_truth(features, metric_mode) elif metric_mode.metric_id == "revenue_per_visitor": return _compute_hurdle_truth(features, metric_mode, revenue_model) else: raise ValueError( f"compute_potential_outcomes: unsupported metric_id " f"{metric_mode.metric_id!r}" )
# --------------------------------------------------------------------------- # Metric-family mapping # --------------------------------------------------------------------------- _METRIC_FAMILY: dict[str, MetricFamily] = { "conversion_rate": MetricFamily.BINARY, "revenue_per_visitor": MetricFamily.HURDLE_REAL, } def _metric_to_family(metric_id: str) -> MetricFamily: """Map a canonical metric_id to its MetricFamily. Raises ------ ValueError If metric_id is not a known metric. """ try: return _METRIC_FAMILY[metric_id] except KeyError: raise ValueError( f"_metric_to_family: unknown metric_id {metric_id!r}. " f"Known: {sorted(_METRIC_FAMILY)}" ) from None # --------------------------------------------------------------------------- # Truth construction # ---------------------------------------------------------------------------
[docs] def build_truth(truth_result: TruthResult, metric_mode: MetricMode) -> CalibrationTruth: """Construct a CalibrationTruth from computed potential outcomes. Bridges the internal ``TruthResult`` (pipeline intermediate) to the public ``CalibrationTruth`` contract. **K-dispatch:** K=2 (paired form): populates the legacy 1-D fields (``cate_per_visitor``, ``conv/aov_cate_per_visitor``, ``p0/p1/m0/m1_per_visitor``); leaves the three new list fields (``contrast_cate_per_visitor``, ``p_per_visitor``, ``m_per_visitor``) as ``None``. K≥3 (multi-treatment form): populates ``contrast_cate_per_visitor`` (length K−1), ``p_per_visitor`` (length K), and ``m_per_visitor`` (length K, or ``None`` for ``conversion_rate``); leaves all legacy 1-D fields as ``None``. Parameters ---------- truth_result: Output of ``compute_potential_outcomes`` — holds per-visitor CATE, population effect, and decomposition components. metric_mode: The metric configuration used to compute truth — provides ``metric_id`` for family derivation. Returns ------- CalibrationTruth Typed, frozen ground truth. """ family = _metric_to_family(metric_mode.metric_id) # Multi-treatment (K≥3) branch — truth_result.p_values is the signal. if truth_result.p_values is not None: p_vals = truth_result.p_values m_vals = truth_result.m_values K = len(p_vals) n = len(p_vals[0]) # K−1 per-treatment contrasts vs. control. if m_vals is not None: contrast_cate_per_visitor = [ AlignedVisitorArray( values=p_vals[j + 1] * m_vals[j + 1] - p_vals[0] * m_vals[0], n_visitors=n, ) for j in range(K - 1) ] else: # conversion_rate: contrast is p[j+1] - p[0]. contrast_cate_per_visitor = [ AlignedVisitorArray( values=p_vals[j + 1] - p_vals[0], n_visitors=n, ) for j in range(K - 1) ] p_per_visitor = [ AlignedVisitorArray(values=p_vals[k], n_visitors=n) for k in range(K) ] m_per_visitor: list[AlignedVisitorArray] | None = ( [AlignedVisitorArray(values=m_vals[k], n_visitors=n) for k in range(K)] if m_vals is not None else None ) return CalibrationTruth( effect=truth_result.effect, metric_id=metric_mode.metric_id, metric_family=family, effect_components=truth_result.effect_components, cate_per_visitor=None, contrast_cate_per_visitor=contrast_cate_per_visitor, p_per_visitor=p_per_visitor, m_per_visitor=m_per_visitor, ) # K=2 (paired) branch — existing behavior unchanged. assert truth_result.cate_per_visitor is not None, ( "build_truth: K=2 TruthResult must have cate_per_visitor non-None" ) n = truth_result.cate_per_visitor.n_visitors # Wire through raw surface values as AlignedVisitorArrays for hurdle metrics. p0_per_visitor = ( AlignedVisitorArray(values=truth_result.p0_values, n_visitors=n) if truth_result.p0_values is not None and metric_mode.metric_id == "revenue_per_visitor" else None ) p1_per_visitor = ( AlignedVisitorArray(values=truth_result.p1_values, n_visitors=n) if truth_result.p1_values is not None and metric_mode.metric_id == "revenue_per_visitor" else None ) # Use effective m values (= analytical cart expected revenue when cart, # = raw surface values when lognormal) for CalibrationTruth so downstream # calibration compares against the correct ground truth. m0_vals_k2 = truth_result.m0_effective if truth_result.m0_effective is not None else truth_result.m0_values m1_vals_k2 = truth_result.m1_effective if truth_result.m1_effective is not None else truth_result.m1_values m0_per_visitor_k2 = ( AlignedVisitorArray(values=m0_vals_k2, n_visitors=n) if m0_vals_k2 is not None else None ) m1_per_visitor_k2 = ( AlignedVisitorArray(values=m1_vals_k2, n_visitors=n) if m1_vals_k2 is not None else None ) return CalibrationTruth( effect=truth_result.effect, metric_id=metric_mode.metric_id, metric_family=family, effect_components=truth_result.effect_components, cate_per_visitor=truth_result.cate_per_visitor, conv_cate_per_visitor=truth_result.conv_per_visitor, aov_cate_per_visitor=truth_result.aov_per_visitor, p0_per_visitor=p0_per_visitor, p1_per_visitor=p1_per_visitor, m0_per_visitor=m0_per_visitor_k2, m1_per_visitor=m1_per_visitor_k2, )
# --------------------------------------------------------------------------- # Composable surface helpers # ---------------------------------------------------------------------------
[docs] def add_surfaces(s1: SurfaceConfig, s2: SurfaceConfig) -> SurfaceConfig: """Return a surface whose output is the element-wise sum of two surfaces. Parameters ---------- s1, s2: Input surfaces. Both receive the same feature DataFrame ``X``. Returns ------- SurfaceConfig Surface computing ``s1(X) + s2(X)``. """ return SurfaceConfig(fn=lambda X: s1.fn(X) + s2.fn(X))
[docs] def multiply_surfaces(s1: SurfaceConfig, s2: SurfaceConfig) -> SurfaceConfig: """Return a surface whose output is the element-wise product of two surfaces. Parameters ---------- s1, s2: Input surfaces. Both receive the same feature DataFrame ``X``. Returns ------- SurfaceConfig Surface computing ``s1(X) * s2(X)``. """ return SurfaceConfig(fn=lambda X: s1.fn(X) * s2.fn(X))
[docs] def sigmoid_surface(s: SurfaceConfig) -> SurfaceConfig: """Return a surface applying the sigmoid transform to an inner surface. The sigmoid maps any real value to the open interval (0, 1):: sigmoid(z) = 1 / (1 + exp(-z)) Useful for composing linear or nonlinear terms into a valid probability surface without manual clipping. Parameters ---------- s: Inner surface whose output is passed through sigmoid. Returns ------- SurfaceConfig Surface computing ``sigmoid(s(X))``. """ return SurfaceConfig(fn=lambda X: 1.0 / (1.0 + np.exp(-s.fn(X))))
[docs] def threshold_surface( s: SurfaceConfig, cutoff: float, above: float, below: float, ) -> SurfaceConfig: """Return a step-function surface based on a threshold applied to an inner surface. Produces a discontinuous surface suitable for stress-testing HTE estimators that assume smooth effect functions:: result[i] = above if s(X)[i] > cutoff below otherwise Parameters ---------- s: Inner surface to threshold. cutoff: Decision boundary. Strict ``>`` comparison. above: Output value where ``s(X) > cutoff``. below: Output value where ``s(X) <= cutoff``. Returns ------- SurfaceConfig Step-function surface. """ return SurfaceConfig(fn=lambda X: np.where(s.fn(X) > cutoff, above, below))
# --------------------------------------------------------------------------- # Cart-based revenue sampler (Task 15.2) # --------------------------------------------------------------------------- def _sample_cart_revenue( cart_config: CartRevenueConfig, severity_delta: np.ndarray, converter_mask: np.ndarray, rng: np.random.Generator, ) -> np.ndarray: """Sample revenue for each visitor using the cart-based model. For each converter, fires a Bernoulli for each product category. The purchase probability for category ``j`` and visitor ``i`` is: ``purchase_prob_j(i) = sigmoid(logit(base_purchase_prob_j) + severity_delta[i] × weight_j)`` where ``weight_j = base_purchase_prob_j / sum(base_purchase_prob_k)`` distributes the scalar severity delta across categories proportionally to their base purchase probability (design doc D9). ``severity_delta`` is the arm-specific severity delta: - Control arm: ``np.zeros(n_control)`` (no treatment effect). - Treatment arm: ``m1[treatment_indices] - m0[treatment_indices]``. Revenue for each converter = sum of ``price_j ~ Normal(base_price_j, price_std_j)`` clipped to ``base_price_j / 2`` minimum, for all categories that fired. Empty-cart fallback forces the cheapest category. Non-converters always get zero revenue. Parameters ---------- cart_config: Cart revenue model configuration with category definitions. severity_delta: Per-visitor scalar severity delta (logit-additive shift on category purchase probabilities). Control arm passes zeros; treatment arm passes ``m1 - m0``. Shape: ``(n_arm,)``. converter_mask: Boolean mask of shape ``(n_arm,)`` — ``True`` for converters. rng: Caller-controlled RNG for reproducibility. Returns ------- np.ndarray Shape ``(n_arm,)`` — revenue for each visitor. Non-converters have zero revenue. """ categories = cart_config.categories n_visitors = len(converter_mask) revenue = np.zeros(n_visitors, dtype=float) if not np.any(converter_mask): return revenue # Pre-compute category weights for distributing severity delta. base_probs = np.array([c.base_purchase_prob for c in categories], dtype=float) total_base_prob = base_probs.sum() weights = base_probs / total_base_prob # shape: (n_categories,) # Logit of base purchase probabilities. logit_base = np.log(base_probs / (1.0 - base_probs)) # shape: (n_categories,) converter_indices = np.where(converter_mask)[0] n_converters = len(converter_indices) # severity_delta for each converter: shape (n_converters,) conv_delta = severity_delta[converter_indices] # Purchase probabilities: shape (n_converters, n_categories) # For each converter i and category j: # logit_prob_j(i) = logit(base_j) + conv_delta[i] * weight_j logit_probs = logit_base[np.newaxis, :] + conv_delta[:, np.newaxis] * weights[np.newaxis, :] purchase_probs = 1.0 / (1.0 + np.exp(-logit_probs)) # sigmoid # Fire Bernoulli for each (converter, category) pair. # fired[i, j] = True if converter i buys category j. fired = rng.random((n_converters, len(categories))) < purchase_probs # Sample prices for each (converter, category) pair. # Price ~ Normal(base_price_j, price_std_j), clipped to base_price_j / 2 minimum. base_prices = np.array([c.base_price for c in categories], dtype=float) price_stds = np.array([c.price_std for c in categories], dtype=float) if np.any(price_stds > 0): raw_prices = rng.normal( loc=base_prices[np.newaxis, :], scale=np.maximum(price_stds[np.newaxis, :], 1e-12), size=(n_converters, len(categories)), ) min_prices = base_prices / 2.0 prices = np.maximum(raw_prices, min_prices[np.newaxis, :]) else: # All price_std == 0: deterministic prices. prices = np.broadcast_to(base_prices[np.newaxis, :], (n_converters, len(categories))).copy() # Revenue = sum of prices for fired categories. conv_revenue = (fired * prices).sum(axis=1) # shape: (n_converters,) # Empty-cart fallback: if no categories fired, force cheapest category. empty_cart = ~np.any(fired, axis=1) # shape: (n_converters,) if np.any(empty_cart): cheapest_idx = int(np.argmin(base_prices)) cheapest_price = prices[np.where(empty_cart)[0], cheapest_idx] conv_revenue[empty_cart] = cheapest_price revenue[converter_indices] = conv_revenue return revenue # --------------------------------------------------------------------------- # Assignment and observed outcome sampling (Task 7.1) # --------------------------------------------------------------------------- def _make_variant_from_observed( name: str, experiment_id: str, features: pd.DataFrame, converted: np.ndarray, revenue: np.ndarray, ) -> VariantData: """Build a VariantData from raw outcome arrays and the feature DataFrame. The visitor DataFrame includes all feature columns from ``features`` plus the required VISITOR_SCHEMA columns. No truth-derived columns are included. """ n = len(converted) converted_bool = converted.astype(bool) data: dict[str, object] = { "visitor_id": [f"{name}-v{i}" for i in range(n)], "experiment_id": experiment_id, "variant": name, "converted": converted_bool, "revenue": revenue.astype(float), "orders_count": converted_bool.astype(np.int64), "sessions_count": np.ones(n, dtype=np.int64), } # Append feature columns (no truth fields — features are observed). for col in features.columns: data[col] = features[col].values visitors = pd.DataFrame(data) return VariantData( name=name, visitors=visitors, n_visitors=n, n_conversions=int(converted_bool.sum()), total_revenue=float(revenue.sum()), )
[docs] def assign_and_observe( features: pd.DataFrame, truth_result: TruthResult, assignment: AssignmentConfig, metric_id: str, rng: np.random.Generator, revenue_model: CartRevenueConfig | None = None, treatment_assignment: np.ndarray | None = None, ) -> list[VariantData]: """Assign visitors to treatment levels and sample observed outcomes. Dispatches to one of two paths based on the truth structure: **Paired/binary path** (``treatment_assignment is None`` AND ``truth_result.p_values is None``): the existing K=2 binary/hurdle sampling unchanged — RNG draws are byte-identical to the original implementation. Returns a 2-element list ``[control, treatment]``. **Multi-treatment path** (``truth_result.p_values is not None``, i.e. K≥3 truth): generalised to K treatment levels. Assignment is either: - **Internal randomisation** (``treatment_assignment is None``): uses ``assignment.treatment_probabilities`` to draw per-visitor treatment indices from a categorical distribution. - **External hook** (``treatment_assignment`` supplied): the caller provides a per-visitor array of treatment indices in ``[0, K−1]`` (e.g. a sequential-experiment cell's policy routing). When ``revenue_model`` is a ``CartRevenueConfig``, cart-based revenue sampling is used (K=2 only). Supplying a ``revenue_model`` for K≥3 raises ``NotImplementedError``. Parameters ---------- features: Feature matrix produced by ``sample_features``, one row per visitor. truth_result: Output of ``compute_potential_outcomes``. K=2: carries ``p0_values`` and ``p1_values`` (and hurdle arrays). K≥3: carries ``p_values`` (and ``m_values``/``sigma_values`` for hurdle metrics). assignment: Treatment assignment parameters. Paired path uses ``treatment_allocation``; multi-treatment internal path uses ``treatment_probabilities``. metric_id: Canonical metric identifier — determines revenue sampling path. rng: Caller-controlled RNG for reproducible assignment and sampling. revenue_model: Revenue sampling strategy for hurdle metrics. ``None`` (default) uses LogNormal sampling. A ``CartRevenueConfig`` activates cart-based sampling (K=2 only). Ignored for binary metrics. treatment_assignment: External per-visitor treatment-index array (ints in ``[0, K−1]``). When supplied, bypasses internal randomisation and drives group membership exactly — suitable for the sequential-experiment loop where a cell's policy routes visitors by features. Requires multi-treatment truth (``truth_result.p_values`` non-None); supplying this with K=2 paired truth raises ``ValueError``. Returns ------- list[VariantData] Length K. Index 0 = control (``name="control"``); indices 1…K−1 are ``name="treatment_k"`` for K≥3, or ``name="treatment"`` for K=2. All VariantData contain observed columns and feature columns only (no truth fields). Raises ------ ValueError If ``truth_result`` is missing required potential outcome arrays, if ``treatment_assignment`` is supplied with K=2 paired truth, or if ``treatment_assignment`` has invalid length or out-of-range indices. NotImplementedError If ``revenue_model`` is non-None and K≥3 truth is present. """ if treatment_assignment is None and truth_result.p_values is None: return _assign_and_observe_paired( features, truth_result, assignment, metric_id, rng, revenue_model ) return _assign_and_observe_multi( features, truth_result, assignment, metric_id, rng, revenue_model, treatment_assignment, )
def _assign_and_observe_paired( features: pd.DataFrame, truth_result: TruthResult, assignment: AssignmentConfig, metric_id: str, rng: np.random.Generator, revenue_model: CartRevenueConfig | None, ) -> list[VariantData]: """K=2 paired/binary observation path. The RNG call sequence is byte-identical to the original inline implementation — the K=2 snapshot baselines depend on this stream. """ if truth_result.p0_values is None or truth_result.p1_values is None: raise ValueError( "assign_and_observe: truth_result must carry p0_values and p1_values. " "Ensure compute_potential_outcomes populated these fields." ) n = len(features) p0 = truth_result.p0_values p1 = truth_result.p1_values # Step 1: assign each visitor to control (False) or treatment (True). treatment_mask = rng.random(n) < assignment.treatment_allocation control_mask = ~treatment_mask control_indices = np.where(control_mask)[0] treatment_indices = np.where(treatment_mask)[0] # Step 2: sample observed conversions from per-visitor Bernoulli. ctrl_converted = rng.random(len(control_indices)) < p0[control_indices] treat_converted = rng.random(len(treatment_indices)) < p1[treatment_indices] # Step 3: sample revenue — zero for binary, hurdle draw (lognormal or cart). if metric_id == "revenue_per_visitor": if truth_result.m0_values is None or truth_result.m1_values is None: raise ValueError( "assign_and_observe: hurdle metric requires m0_values and m1_values " "in truth_result." ) m0 = truth_result.m0_values m1 = truth_result.m1_values if isinstance(revenue_model, CartRevenueConfig): # Cart-based revenue: distribute severity delta across categories. # Control arm has zero severity delta (baseline purchase probabilities). # Treatment arm delta is (m1 - m0) distributed across categories. ctrl_severity_delta = np.zeros(len(control_indices), dtype=float) treat_severity_delta = m1[treatment_indices] - m0[treatment_indices] ctrl_revenue = _sample_cart_revenue( cart_config=revenue_model, severity_delta=ctrl_severity_delta, converter_mask=ctrl_converted, rng=rng, ) treat_revenue = _sample_cart_revenue( cart_config=revenue_model, severity_delta=treat_severity_delta, converter_mask=treat_converted, rng=rng, ) else: # Default: LogNormal sampling for converters. if truth_result.sigma0_values is None or truth_result.sigma1_values is None: raise ValueError( "assign_and_observe: hurdle metric with lognormal requires " "sigma0_values and sigma1_values in truth_result." ) sigma0 = truth_result.sigma0_values sigma1 = truth_result.sigma1_values # Control arm: LogNormal sampling for converters. ctrl_sigma = sigma0[control_indices] ctrl_mu = np.log(m0[control_indices]) - ctrl_sigma**2 / 2 ctrl_sampled = np.exp(rng.normal(ctrl_mu, ctrl_sigma)) ctrl_revenue = np.where(ctrl_converted, ctrl_sampled, 0.0) if not np.all(np.isfinite(ctrl_revenue)): raise ValueError( "Sampled control revenue contains non-finite values (extreme sigma?)" ) # Treatment arm: LogNormal sampling for converters. treat_sigma = sigma1[treatment_indices] treat_mu = np.log(m1[treatment_indices]) - treat_sigma**2 / 2 treat_sampled = np.exp(rng.normal(treat_mu, treat_sigma)) treat_revenue = np.where(treat_converted, treat_sampled, 0.0) if not np.all(np.isfinite(treat_revenue)): raise ValueError( "Sampled treatment revenue contains non-finite values (extreme sigma?)" ) elif metric_id == "conversion_rate": ctrl_revenue = np.zeros(len(control_indices), dtype=float) treat_revenue = np.zeros(len(treatment_indices), dtype=float) else: raise ValueError( f"assign_and_observe: unsupported metric_id {metric_id!r}" ) # Step 4: build VariantData for each arm. # Use the experiment_id placeholder "sim-v2" — caller provides via config; # the bundle assembly step sets the real experiment_id via build_bundle. ctrl_features = features.iloc[control_indices].reset_index(drop=True) treat_features = features.iloc[treatment_indices].reset_index(drop=True) control = _make_variant_from_observed( name="control", experiment_id="sim-v2", features=ctrl_features, converted=ctrl_converted, revenue=ctrl_revenue, ) treatment = _make_variant_from_observed( name="treatment", experiment_id="sim-v2", features=treat_features, converted=treat_converted, revenue=treat_revenue, ) return [control, treatment] def _assign_and_observe_multi( features: pd.DataFrame, truth_result: TruthResult, assignment: AssignmentConfig, metric_id: str, rng: np.random.Generator, revenue_model: CartRevenueConfig | None, treatment_assignment: np.ndarray | None, ) -> list[VariantData]: """K>=3 (or externally-assigned) multi-treatment observation path.""" if treatment_assignment is not None and truth_result.p_values is None: raise ValueError( "assign_and_observe: external treatment_assignment requires " "multi-treatment truth (p_values); the K=2 paired path uses AssignmentConfig" ) assert truth_result.p_values is not None K = len(truth_result.p_values) n = len(features) if revenue_model is not None: raise NotImplementedError( "assign_and_observe: K>=3 cart revenue is out of scope; lognormal only" ) # Determine per-visitor treatment index Z (length n, int in [0, K−1]). if treatment_assignment is not None: if len(treatment_assignment) != n: raise ValueError( f"assign_and_observe: treatment_assignment length {len(treatment_assignment)} " f"must equal n_visitors={n}" ) if not np.all((np.asarray(treatment_assignment) >= 0) & (np.asarray(treatment_assignment) < K)): raise ValueError( f"assign_and_observe: treatment_assignment entries must be in [0, {K - 1}]" ) Z = np.asarray(treatment_assignment) else: if assignment.treatment_probabilities is None: raise ValueError( "assign_and_observe: multi-treatment path requires " "assignment.treatment_probabilities (not None)" ) Z = rng.choice(K, size=n, p=np.array(assignment.treatment_probabilities)) # Build one VariantData per treatment level. variants: list[VariantData] = [] for k in range(K): idx = np.where(Z == k)[0] n_k = len(idx) p_k = truth_result.p_values[k] conversions = rng.random(n_k) < p_k[idx] if metric_id == "revenue_per_visitor": assert truth_result.m_values is not None and truth_result.sigma_values is not None m_k = truth_result.m_values[k] sigma_k = truth_result.sigma_values[k] m_idx = m_k[idx] sigma_idx = sigma_k[idx] mu_idx = np.log(m_idx) - sigma_idx**2 / 2 sampled = np.exp(rng.normal(mu_idx, sigma_idx)) revenue = np.where(conversions, sampled, 0.0) if not np.all(np.isfinite(revenue)): raise ValueError( f"assign_and_observe: sampled revenue for level k={k} " "contains non-finite values (extreme sigma?)" ) else: revenue = np.zeros(n_k, dtype=float) name = "control" if k == 0 else f"treatment_{k}" variant = _make_variant_from_observed( name=name, experiment_id="sim-v2", features=features.iloc[idx].reset_index(drop=True), converted=conversions, revenue=revenue, ) variants.append(variant) return variants # --------------------------------------------------------------------------- # Bundle assembly (Task 7.2) # ---------------------------------------------------------------------------
[docs] def build_bundle( variants: list[VariantData], truth_result: TruthResult, metric_mode: MetricMode, experiment_id: str, ) -> CalibrationBundle: """Assemble a CalibrationBundle from variant list and truth. Accepts a list of K VariantData (K=2 for paired designs, K≥3 for multi-treatment). Stamps the canonical ``experiment_id`` onto each variant, validates the observed data schema, and performs an alignment check appropriate for the truth form: - K=2 (``truth.cate_per_visitor`` non-None): standard CATE alignment. - K≥3 (``truth.cate_per_visitor is None``): aligns ``truth.p_per_visitor[0]`` against the total observed visitor count. Fail-closed: raises on any validation or alignment violation. Parameters ---------- variants: List of VariantData, one per treatment level. Index 0 = control, indices 1…K−1 = treated levels. truth_result: Internal truth intermediate from ``compute_potential_outcomes``. metric_mode: Metric configuration — provides ``metric_id`` for family derivation. experiment_id: Canonical experiment identifier stamped onto observed data. Returns ------- CalibrationBundle ``(observed, truth)`` — observed is validated and truth-free. Raises ------ SchemaViolation If observed data violates the visitor schema contract. AlignmentViolation If the truth array length doesn't match total observed visitors. ValueError If alignment check fails at the n_visitors level. """ # Rebuild VariantData with the real experiment_id stamped on each row. def _restamped(variant: VariantData, exp_id: str) -> VariantData: df = variant.visitors.copy() df["experiment_id"] = exp_id return VariantData( name=variant.name, visitors=df, n_visitors=variant.n_visitors, n_conversions=variant.n_conversions, total_revenue=variant.total_revenue, ) stamped_variants = [_restamped(v, experiment_id) for v in variants] observed = ObservedExperimentData( experiment_id=experiment_id, metric=metric_mode.metric_id, variants=stamped_variants, ) truth = build_truth(truth_result, metric_mode) # Fail-closed validation. validate_observed_data(observed, strict=True) # Alignment check — dispatch on truth form. if truth.cate_per_visitor is not None: # K=2 paired path: standard CATE alignment. validate_alignment(truth.cate_per_visitor, observed) else: # K≥3 multi-treatment path: p_per_visitor[0] carries the alignment # signal (each array has length = total n_visitors). assert truth.p_per_visitor is not None, ( "build_bundle: K>=3 truth must have p_per_visitor non-None" ) validate_alignment(truth.p_per_visitor[0], observed) return CalibrationBundle(observed=observed, truth=truth)
# --------------------------------------------------------------------------- # Public entrypoint # ---------------------------------------------------------------------------
[docs] def generate_v2_core(config: V2GeneratorConfig) -> CalibrationBundle: """Generate a CalibrationBundle via the v2 potential-outcomes pipeline. Pipeline stages: 1. sample_features — draw correlated mixed covariates X. 2. compute_potential_outcomes — derive per-visitor truth arrays. 3. assign_and_observe — assign treatment, sample observed outcomes. 4. build_bundle — assemble CalibrationBundle(observed, truth). Parameters ---------- config: Fully specified v2 generator configuration. Returns ------- CalibrationBundle ``(observed, truth)`` — observed is truth-free, truth contains per-visitor CATE aligned with concatenated visitor rows. """ rng = np.random.default_rng(config.seed) features = sample_features(config.feature_sampler, rng, config.n_visitors) truth_result = compute_potential_outcomes( features, config.metric_mode, revenue_model=config.revenue_model ) # Strip cluster_id before output — surfaces used it for per-cluster dispatch, # but it must not be visible to estimators (latent structure, not observed). features = features.drop(columns=["cluster_id"], errors="ignore") variants = assign_and_observe( features, truth_result, config.assignment, config.metric_mode.metric_id, rng, revenue_model=config.revenue_model, ) return build_bundle( variants=variants, truth_result=truth_result, metric_mode=config.metric_mode, experiment_id=config.experiment_id, )