"""V2 generator core — single potential-outcomes pipeline.
Public API
----------
generate_v2_core(config: V2GeneratorConfig) -> CalibrationBundle
Pipeline stages (filled in by subsequent tasks):
1. sample_features(config, rng) -> X: pd.DataFrame
2. compute_potential_outcomes(X, params) -> truth arrays
3. assign_and_sample(X, truth, assignment, rng) -> observed outcomes
4. build_bundle(observed, truth) -> CalibrationBundle
Config design
-------------
Typed frozen dataclasses with __post_init__ fail-closed validation.
Feature sampler is a discriminated union: CopulaConfig | MixtureConfig.
Both strategies produce the same output shape (X: pd.DataFrame).
Surfaces are Callable[[pd.DataFrame], np.ndarray].
"""
from __future__ import annotations
import dataclasses
from collections.abc import Callable
from typing import Any, Literal
import numpy as np
import pandas as pd
from scipy import stats
from pytyche.contracts import (
AlignedVisitorArray,
CalibrationBundle,
CalibrationTruth,
CartRevenueConfig,
MetricFamily,
ObservedExperimentData,
VariantData,
)
from pytyche.validation import validate_alignment, validate_observed_data
# ---------------------------------------------------------------------------
# Feature spec
# ---------------------------------------------------------------------------
FeatureKind = Literal["continuous", "categorical", "binary"]
_VALID_FEATURE_KINDS: frozenset[str] = frozenset(
{"continuous", "categorical", "binary"}
)
[docs]
@dataclasses.dataclass(frozen=True)
class FeatureSpec:
"""Specification for a single feature column.
Fields:
name: Column name. Must be non-empty.
kind: Feature type — ``"continuous"``, ``"categorical"``, or
``"binary"``.
"""
name: str
kind: FeatureKind
def __post_init__(self) -> None:
if not self.name:
raise ValueError("FeatureSpec: name must be non-empty")
if self.kind not in _VALID_FEATURE_KINDS:
raise ValueError(
f"FeatureSpec: kind must be one of {sorted(_VALID_FEATURE_KINDS)}, "
f"got {self.kind!r}"
)
# ---------------------------------------------------------------------------
# Feature sampler configs
# ---------------------------------------------------------------------------
[docs]
@dataclasses.dataclass(frozen=True)
class CopulaConfig:
"""Gaussian copula feature sampler configuration.
Generates correlated mixed covariates via multivariate normal with
configured correlation matrix and marginal transforms.
Fields:
features: Ordered feature specifications. Length determines the
dimension of the copula.
correlation: Square correlation matrix of shape
``(len(features), len(features))``. Must be positive semi-definite.
"""
features: tuple[FeatureSpec, ...]
correlation: np.ndarray
def __post_init__(self) -> None:
n = len(self.features)
if self.correlation.shape != (n, n):
raise ValueError(
f"CopulaConfig: correlation shape {self.correlation.shape} "
f"must match (len(features), len(features)) = ({n}, {n})"
)
# Validate correlation matrix properties.
if not np.allclose(self.correlation, self.correlation.T, atol=1e-12):
raise ValueError(
"CopulaConfig: correlation matrix must be symmetric"
)
if not np.allclose(np.diag(self.correlation), 1.0, atol=1e-12):
raise ValueError(
"CopulaConfig: correlation matrix diagonal must be all 1.0"
)
if np.linalg.eigvalsh(self.correlation).min() < -1e-10:
raise ValueError(
"CopulaConfig: correlation matrix must be positive semi-definite"
)
corr_copy = self.correlation.copy()
corr_copy.flags.writeable = False
object.__setattr__(self, "correlation", corr_copy)
[docs]
@dataclasses.dataclass(frozen=True)
class MixtureConfig:
"""Mixture-of-populations feature sampler configuration.
Generates covariates by sampling latent cluster membership and then
drawing from cluster-conditional feature distributions.
Fields:
features: Feature specifications shared across all clusters.
weights: Per-cluster mixture weights. All must be strictly positive.
Length must equal ``len(cluster_params)``.
cluster_params: Per-cluster parameter dicts keyed by feature name
(or by arbitrary convention per stage implementation).
"""
features: tuple[FeatureSpec, ...]
weights: tuple[float, ...]
cluster_params: tuple[dict[str, Any], ...]
def __post_init__(self) -> None:
if any(w <= 0 for w in self.weights):
raise ValueError(
"MixtureConfig: all weight values must be strictly positive"
)
if len(self.weights) != len(self.cluster_params):
raise ValueError(
f"MixtureConfig: len(weights)={len(self.weights)} must equal "
f"len(cluster_params)={len(self.cluster_params)}"
)
#: Discriminated union of feature sampler strategy configs.
FeatureSamplerConfig = CopulaConfig | MixtureConfig
# ---------------------------------------------------------------------------
# Surface config
# ---------------------------------------------------------------------------
[docs]
@dataclasses.dataclass(frozen=True)
class SurfaceConfig:
"""HTE surface definition as a callable.
The callable receives the feature DataFrame ``X`` (one row per visitor)
and returns a 1-D numpy array of per-visitor values (e.g. conversion
probabilities, severity means).
Fields:
fn: ``(X: pd.DataFrame) -> np.ndarray`` — must return a 1-D array
of length ``len(X)``.
"""
fn: Callable[[pd.DataFrame], np.ndarray]
# ---------------------------------------------------------------------------
# Assignment config
# ---------------------------------------------------------------------------
[docs]
@dataclasses.dataclass(frozen=True)
class AssignmentConfig:
"""Fixed (forced) treatment-assignment policy.
Configures the FIXED/forced assignment used for simple randomized designs
and SBC — every visitor's treatment is drawn from a feature-independent
distribution (binary 50/50 or a uniform/fixed per-treatment vector).
Policy-routed assignment — where a sequential experiment's cell routes each
visitor to a treatment based on their features — is NOT encoded here; it is
supplied externally at observation time. This config is just one assignment
policy (the forced/randomized one).
**Binary (paired) form:** set ``treatment_allocation`` (fraction in (0, 1));
leave ``treatment_probabilities`` as ``None``.
**Multi-treatment form:** set ``treatment_probabilities`` to a length-K tuple
of positive fractions summing to 1.0 (within 1e-9) — the fixed per-treatment
assignment probabilities (uniform = balanced forced exploration).
``treatment_allocation`` is unused in this form but must still be in (0, 1).
Fields:
treatment_allocation: Fraction of visitors assigned to treatment in the
binary form. Must be in the open interval (0, 1). Default 0.5.
treatment_probabilities: Fixed per-treatment assignment probabilities for
K≥2 (the forced/randomized policy). ``None`` selects the binary
paired form. Length must be ≥ 2, every entry strictly in (0, 1),
and entries must sum to 1.0 (±1e-9).
"""
treatment_allocation: float = 0.5
treatment_probabilities: tuple[float, ...] | None = dataclasses.field(default=None, kw_only=True)
@property
def n_treatments(self) -> int:
return 2 if self.treatment_probabilities is None else len(self.treatment_probabilities)
def __post_init__(self) -> None:
if not (0.0 < self.treatment_allocation < 1.0):
raise ValueError(
f"AssignmentConfig: treatment_allocation must be in (0, 1), "
f"got {self.treatment_allocation}"
)
if self.treatment_probabilities is not None:
if len(self.treatment_probabilities) < 2:
raise ValueError(
f"AssignmentConfig: treatment_probabilities must have at least 2 entries, "
f"got {len(self.treatment_probabilities)}"
)
if not all(0.0 < v < 1.0 for v in self.treatment_probabilities):
raise ValueError(
"AssignmentConfig: every entry in treatment_probabilities must be in (0, 1)"
)
if abs(sum(self.treatment_probabilities) - 1.0) > 1e-9:
raise ValueError(
f"AssignmentConfig: treatment_probabilities must sum to 1, "
f"got {sum(self.treatment_probabilities)}"
)
# ---------------------------------------------------------------------------
# Metric mode
# ---------------------------------------------------------------------------
_VALID_METRIC_IDS: frozenset[str] = frozenset(
{"conversion_rate", "revenue_per_visitor"}
)
[docs]
@dataclasses.dataclass(frozen=True)
class MetricMode:
"""Metric-specific potential outcome surfaces.
**K=2 (paired) form:** populate the six paired surface fields
(``p0_surface``, ``p1_surface``, and for hurdle metrics ``m0/m1/sigma0/sigma1``);
leave ``p_surfaces``, ``m_surfaces``, and ``sigma_surfaces`` as ``None``.
**Multi-treatment form:** leave all six paired surface fields as ``None`` and
populate ``p_surfaces`` (length K) and, for hurdle metrics,
``m_surfaces`` and ``sigma_surfaces`` (also length K). K must be ≥ 3 —
the K = 2 case uses the paired form (the truth-computation dispatch routes
list-form surfaces through the multi-treatment path only at K ≥ 3).
Every surface is a function of the individual user's attributes ``X``. A
``SurfaceConfig`` carries the potential-outcome surface for ONE treatment
level: ``p_surfaces[k](X)`` is the conversion potential outcome a user with
features ``X`` WOULD show under treatment level ``k`` (level 0 = control,
the prognostic baseline; levels 1…K−1 = treated). The surfaces are not
properties of an assignment cohort — assignment (randomization, and in a
sequential experiment a cell's policy) merely selects which of a user's K
potential outcomes is observed.
For binary metrics (``"conversion_rate"``): only conversion probability
surfaces are needed; severity/sigma surfaces must be ``None``.
For hurdle metrics (``"revenue_per_visitor"``): conversion, severity, and
sigma surfaces are all required.
Fields:
metric_id: Canonical metric name. Must be a known metric.
p0_surface: Control (level-0) conversion potential-outcome surface (paired form).
p1_surface: Treated (level-1) conversion potential-outcome surface (paired form).
m0_surface: Control severity (AOV) surface. Hurdle paired form only.
m1_surface: Treated severity (AOV) surface. Hurdle paired form only.
sigma0_surface: Control LogNormal dispersion surface. Hurdle paired only.
sigma1_surface: Treated LogNormal dispersion surface. Hurdle paired only.
p_surfaces: Conversion potential-outcome surfaces, one per treatment
level (length K, index 0 = control). Each is a function of user
attributes ``X``.
m_surfaces: Severity potential-outcome surfaces, one per treatment
level (length K). Hurdle multi-treatment form.
sigma_surfaces: LogNormal dispersion surfaces, one per treatment level
(length K). Hurdle multi-treatment form.
"""
metric_id: str
p0_surface: SurfaceConfig | None = None
p1_surface: SurfaceConfig | None = None
m0_surface: SurfaceConfig | None = None
m1_surface: SurfaceConfig | None = None
sigma0_surface: SurfaceConfig | None = None
sigma1_surface: SurfaceConfig | None = None
p_surfaces: list[SurfaceConfig] | None = dataclasses.field(default=None, kw_only=True)
m_surfaces: list[SurfaceConfig] | None = dataclasses.field(default=None, kw_only=True)
sigma_surfaces: list[SurfaceConfig] | None = dataclasses.field(default=None, kw_only=True)
@property
def n_treatments(self) -> int:
return 2 if self.p_surfaces is None else len(self.p_surfaces)
def __post_init__(self) -> None:
if self.p_surfaces is not None:
# multi-treatment (K>=3) list form
if (
self.p0_surface is not None
or self.p1_surface is not None
or self.m0_surface is not None
or self.m1_surface is not None
or self.sigma0_surface is not None
or self.sigma1_surface is not None
):
raise ValueError(
"MetricMode: cannot mix paired and multi-treatment surface forms"
)
if len(self.p_surfaces) < 3:
raise ValueError(
f"MetricMode: the list surface form is for K >= 3 "
f"(got {len(self.p_surfaces)} entries); use the paired "
"p0/p1 surface form for K = 2"
)
if self.metric_id not in _VALID_METRIC_IDS:
raise ValueError(
f"MetricMode: metric_id must be one of "
f"{sorted(_VALID_METRIC_IDS)}, got {self.metric_id!r}"
)
if self.metric_id == "conversion_rate":
if self.m_surfaces is not None or self.sigma_surfaces is not None:
raise ValueError(
"MetricMode: conversion_rate must not have hurdle surfaces "
"(m_surfaces and sigma_surfaces must be None)"
)
if self.metric_id == "revenue_per_visitor":
if self.m_surfaces is None or self.sigma_surfaces is None:
raise ValueError(
"MetricMode: revenue_per_visitor requires m_surfaces and sigma_surfaces"
)
if (
len(self.m_surfaces) != len(self.p_surfaces)
or len(self.sigma_surfaces) != len(self.p_surfaces)
):
raise ValueError(
"MetricMode: m_surfaces and sigma_surfaces must have the same "
"length as p_surfaces"
)
else:
# Paired (K=2) form — run existing validation exactly as before
if self.p0_surface is None or self.p1_surface is None:
raise ValueError(
"MetricMode: paired form requires p0_surface and p1_surface"
)
if self.metric_id not in _VALID_METRIC_IDS:
raise ValueError(
f"MetricMode: metric_id must be one of "
f"{sorted(_VALID_METRIC_IDS)}, got {self.metric_id!r}"
)
if self.metric_id == "conversion_rate":
if self.m0_surface is not None or self.m1_surface is not None:
raise ValueError(
"MetricMode: conversion_rate must not have hurdle surfaces "
"(m0_surface and m1_surface must be None)"
)
if self.sigma0_surface is not None or self.sigma1_surface is not None:
raise ValueError(
"MetricMode: conversion_rate must not have sigma surfaces "
"(sigma0_surface and sigma1_surface must be None)"
)
if self.metric_id == "revenue_per_visitor":
if self.m0_surface is None or self.m1_surface is None:
raise ValueError(
"MetricMode: revenue_per_visitor requires both hurdle surfaces "
"(m0_surface and m1_surface must not be None)"
)
if self.sigma0_surface is None or self.sigma1_surface is None:
raise ValueError(
"MetricMode: revenue_per_visitor requires both sigma surfaces "
"(sigma0_surface and sigma1_surface must not be None)"
)
# ---------------------------------------------------------------------------
# Top-level generator config
# ---------------------------------------------------------------------------
[docs]
@dataclasses.dataclass(frozen=True)
class V2GeneratorConfig:
"""Top-level configuration for the v2 core generator pipeline.
Fields:
n_visitors: Total visitor count across all arms. Must be positive.
feature_sampler: Feature sampling strategy config
(``CopulaConfig`` or ``MixtureConfig``).
metric_mode: Metric and potential-outcome surface definitions.
assignment: Treatment assignment parameters.
seed: Random seed for full pipeline reproducibility.
experiment_id: Identifier for the generated experiment. Default
``"sim-v2"``.
revenue_model: Revenue sampling strategy for hurdle metrics.
``None`` (default) uses LogNormal sampling (existing behavior).
A ``CartRevenueConfig`` activates cart-based revenue sampling —
per-category Bernoulli draws whose prices sum to converter revenue.
Ignored for binary (``conversion_rate``) metrics.
"""
n_visitors: int
feature_sampler: FeatureSamplerConfig
metric_mode: MetricMode
assignment: AssignmentConfig
seed: int
experiment_id: str = "sim-v2"
revenue_model: CartRevenueConfig | None = None
def __post_init__(self) -> None:
if self.n_visitors <= 0:
raise ValueError(
f"V2GeneratorConfig: n_visitors must be positive, "
f"got {self.n_visitors}"
)
# ---------------------------------------------------------------------------
# Feature sampling
# ---------------------------------------------------------------------------
# Number of discrete categories for "categorical" features in the copula
# strategy. Copula draws are uniform; we threshold into equal-probability bins.
_COPULA_N_CATEGORIES: int = 4
def _sample_copula(
config: CopulaConfig, rng: np.random.Generator, n: int
) -> pd.DataFrame:
"""Sample features via Gaussian copula with marginal transforms.
Algorithm:
1. Draw ``n`` samples from a zero-mean multivariate normal with the
configured correlation matrix.
2. Convert to uniform marginals via the standard normal CDF.
3. Apply per-feature inverse transforms based on ``FeatureKind``:
- continuous: keep the raw normal values (standard-normal marginals).
- categorical: threshold uniform quantiles into ``_COPULA_N_CATEGORIES``
equal-probability bins labelled as strings.
- binary: threshold the raw normal value at 0 → bool.
"""
d = len(config.features)
# Step 1: draw from zero-mean MVN with the configured correlation matrix.
z = rng.multivariate_normal(mean=np.zeros(d), cov=config.correlation, size=n)
# Step 2: convert to uniform marginals for categorical thresholding.
u = stats.norm.cdf(z) # shape (n, d), values in (0, 1)
columns: dict[str, object] = {}
for j, feat in enumerate(config.features):
if feat.kind == "continuous":
columns[feat.name] = z[:, j].astype(float)
elif feat.kind == "categorical":
# Partition (0, 1] into equal bins; label as "cat_0", "cat_1", …
bins = np.linspace(0.0, 1.0, _COPULA_N_CATEGORIES + 1)
indices = np.digitize(u[:, j], bins[1:], right=True)
# np.digitize with right=True and bins[1:] gives 0-based indices.
label_names = [f"cat_{i}" for i in range(_COPULA_N_CATEGORIES)]
raw = [label_names[i] for i in np.clip(indices, 0, _COPULA_N_CATEGORIES - 1)]
# Use pd.Categorical so the dtype is CategoricalDtype (not StringDtype).
columns[feat.name] = pd.Categorical(raw, categories=label_names)
elif feat.kind == "binary":
columns[feat.name] = (z[:, j] > 0.0)
else:
raise ValueError(
f"sample_features: unknown feature kind {feat.kind!r} "
f"for feature {feat.name!r}"
)
return pd.DataFrame(columns)
def _sample_mixture(
config: MixtureConfig, rng: np.random.Generator, n: int
) -> pd.DataFrame:
"""Sample features via mixture-of-populations with cluster-conditional distributions.
Algorithm:
1. Normalize ``config.weights`` to sum to 1.
2. Assign each of ``n`` visitors to a cluster using ``rng.choice`` weighted
by the normalized weights.
3. For each visitor, sample features from the cluster-conditional parameters:
- continuous: ``{name}_mean`` and ``{name}_std`` from ``cluster_params``.
- categorical: ``{name}_probs`` dict (label -> probability) from
``cluster_params``. Labels are drawn proportionally.
- binary: ``{name}_prob`` from ``cluster_params``.
4. Include a ``cluster_id`` column (integer cluster index) in the output.
"""
weights = np.array(config.weights, dtype=float)
normalized_weights = weights / weights.sum()
n_clusters = len(config.cluster_params)
# Assign each visitor to a cluster.
cluster_assignments: np.ndarray = rng.choice(n_clusters, size=n, p=normalized_weights)
columns: dict[str, object] = {}
for feat in config.features:
if feat.kind == "continuous":
values = np.empty(n, dtype=float)
for cluster_idx in range(n_clusters):
mask = cluster_assignments == cluster_idx
if not np.any(mask):
continue
params = config.cluster_params[cluster_idx]
mean = float(params[f"{feat.name}_mean"])
std = float(params[f"{feat.name}_std"])
values[mask] = rng.normal(loc=mean, scale=std, size=int(mask.sum()))
columns[feat.name] = values
elif feat.kind == "categorical":
values_arr = np.empty(n, dtype=object)
for cluster_idx in range(n_clusters):
mask = cluster_assignments == cluster_idx
count = int(mask.sum())
if count == 0:
continue
params = config.cluster_params[cluster_idx]
probs_dict: dict[str, float] = params[f"{feat.name}_probs"]
labels = list(probs_dict.keys())
probs = np.array(list(probs_dict.values()), dtype=float)
probs = probs / probs.sum()
chosen = rng.choice(labels, size=count, p=probs)
values_arr[mask] = chosen
# Use pd.Categorical so the dtype is CategoricalDtype.
all_labels: list[str] = []
for params in config.cluster_params:
for label in params[f"{feat.name}_probs"].keys():
if label not in all_labels:
all_labels.append(label)
columns[feat.name] = pd.Categorical(values_arr, categories=all_labels)
elif feat.kind == "binary":
values_bool = np.empty(n, dtype=bool)
for cluster_idx in range(n_clusters):
mask = cluster_assignments == cluster_idx
count = int(mask.sum())
if count == 0:
continue
params = config.cluster_params[cluster_idx]
prob = float(params[f"{feat.name}_prob"])
values_bool[mask] = rng.random(size=count) < prob
columns[feat.name] = values_bool
else:
raise ValueError(
f"_sample_mixture: unknown feature kind {feat.kind!r} "
f"for feature {feat.name!r}"
)
columns["cluster_id"] = cluster_assignments
return pd.DataFrame(columns)
[docs]
def sample_features(
config: FeatureSamplerConfig, rng: np.random.Generator, n: int
) -> pd.DataFrame:
"""Sample a feature matrix from the given sampler config.
Parameters
----------
config:
Either a ``CopulaConfig`` (Gaussian copula strategy) or a
``MixtureConfig`` (mixture-of-populations strategy).
rng:
Explicit RNG instance — caller controls seeding for reproducibility.
n:
Number of rows to sample.
Returns
-------
pd.DataFrame
Shape ``(n, len(config.features))``. Column names and dtypes follow
the feature specs in ``config.features``. For ``MixtureConfig`` an
additional ``cluster_id`` column is included (used internally for
per-cluster surface dispatch; stripped before output by
``generate_v2_core``).
Raises
------
TypeError
Raised for unsupported config types.
ValueError
Raised for unknown feature kinds within a config.
"""
if isinstance(config, CopulaConfig):
return _sample_copula(config, rng, n)
elif isinstance(config, MixtureConfig):
return _sample_mixture(config, rng, n)
else:
raise TypeError(
f"sample_features: unsupported config type {type(config).__name__!r}"
)
# ---------------------------------------------------------------------------
# Potential outcomes — truth computation
# ---------------------------------------------------------------------------
[docs]
@dataclasses.dataclass(frozen=True)
class TruthResult:
"""Internal intermediate result from compute_potential_outcomes.
Holds per-visitor CATE arrays and population summary statistics before
assembly into a CalibrationBundle. This is not part of the public
contract — build_bundle (task 7.2) constructs CalibrationTruth from this.
**K-dispatch:**
K=2 (paired form): populates the legacy paired ``*_values`` fields
(``p0_values``, ``p1_values``, and for hurdle ``m0/m1/sigma0/sigma1_values``)
plus ``cate_per_visitor``; leaves ``p_values``, ``m_values``,
``sigma_values`` as ``None``.
K≥3 (multi-treatment form): populates ``p_values`` (length K), and for
hurdle ``m_values`` / ``sigma_values`` (also length K); sets
``cate_per_visitor=None`` and leaves all six legacy paired ``*_values``
fields as ``None``.
Fields:
cate_per_visitor: Per-visitor CATE, aligned 1:1 with input rows.
Populated at K=2; ``None`` at K≥3.
effect: Population mean CATE (mean of cate_per_visitor.values at K=2;
mean of the best-treatment contrast at K≥3 — provisional scalar).
effect_components: Named decomposition of the population effect.
Binary K=2: ``{"conv_effect": float}``
Hurdle K=2: ``{"conv_effect": float, "aov_effect": float}``
K≥3: ``{"treatment_1_effect": float, ..., "treatment_{K-1}_effect": float}``
conv_per_visitor: Per-visitor conversion component. Only populated
for hurdle K=2 metrics; ``None`` for binary or K≥3.
aov_per_visitor: Per-visitor AOV component. Only populated for
hurdle K=2 metrics; ``None`` for binary or K≥3.
p0_values: Per-visitor control conversion probabilities. Populated
by both binary and hurdle K=2 computations for use in outcome sampling.
p1_values: Per-visitor treatment conversion probabilities. K=2 only.
m0_values: Per-visitor control severity means (raw surface output).
Hurdle K=2 only; ``None`` for binary or K≥3.
m1_values: Per-visitor treatment severity means (raw surface output).
Hurdle K=2 only; ``None`` for binary or K≥3.
m0_effective: Per-visitor control expected revenue conditional on
conversion. For lognormal, equals ``m0_values``. For cart, this
is the analytical expected cart revenue. ``None`` for binary or K≥3.
m1_effective: Per-visitor treatment expected revenue conditional on
conversion. For lognormal, equals ``m1_values``. For cart, this
is the analytical expected cart revenue. ``None`` for binary or K≥3.
sigma0_values: Per-visitor control LogNormal dispersion. Hurdle K=2
only; ``None`` for binary or K≥3.
sigma1_values: Per-visitor treatment LogNormal dispersion. Hurdle K=2
only; ``None`` for binary or K≥3.
p_values: Per-visitor conversion potential outcomes, one array per
treatment level (length K, index 0 = control). K≥3 only;
``None`` at K=2.
m_values: Per-visitor severity potential outcomes (length K). K≥3
hurdle only; ``None`` for K=2 or conversion_rate.
sigma_values: Per-visitor LogNormal dispersion (length K). K≥3 hurdle
only; ``None`` for K=2 or conversion_rate.
"""
cate_per_visitor: AlignedVisitorArray | None
effect: float
effect_components: dict[str, float]
conv_per_visitor: AlignedVisitorArray | None = None
aov_per_visitor: AlignedVisitorArray | None = None
p0_values: np.ndarray | None = None
p1_values: np.ndarray | None = None
m0_values: np.ndarray | None = None
m1_values: np.ndarray | None = None
m0_effective: np.ndarray | None = None
m1_effective: np.ndarray | None = None
sigma0_values: np.ndarray | None = None
sigma1_values: np.ndarray | None = None
p_values: list[np.ndarray] | None = None
m_values: list[np.ndarray] | None = None
sigma_values: list[np.ndarray] | None = None
def _validate_surface_output(
values: object,
n_expected: int,
surface_name: str,
*,
lower_bound: float | None = None,
upper_bound: float | None = None,
strict_lower: bool = False,
) -> np.ndarray:
"""Validate and coerce a surface output to a well-formed 1-D ndarray.
Checks:
1. Coerce to ndarray via ``np.asarray``.
2. Must be 1-D.
3. Length must equal ``n_expected``.
4. All values must be finite (no NaN or Inf).
5. Optional domain bounds (inclusive or strict-lower).
Returns the validated ndarray.
Raises
------
ValueError
On any violation — with a message naming the surface.
"""
arr = np.asarray(values, dtype=float)
if arr.ndim != 1:
raise ValueError(
f"Surface '{surface_name}': expected 1-D array, got ndim={arr.ndim}"
)
if len(arr) != n_expected:
raise ValueError(
f"Surface '{surface_name}': expected length {n_expected}, got {len(arr)}"
)
if not np.all(np.isfinite(arr)):
raise ValueError(
f"Surface '{surface_name}': contains non-finite values (NaN or Inf)"
)
if lower_bound is not None:
if strict_lower:
if not np.all(arr > lower_bound):
raise ValueError(
f"Surface '{surface_name}': all values must be > {lower_bound}"
)
else:
if not np.all(arr >= lower_bound):
raise ValueError(
f"Surface '{surface_name}': all values must be >= {lower_bound}"
)
if upper_bound is not None:
if not np.all(arr <= upper_bound):
raise ValueError(
f"Surface '{surface_name}': all values must be <= {upper_bound}"
)
return arr
def _compute_binary_truth(
features: pd.DataFrame, metric_mode: MetricMode
) -> TruthResult:
"""Compute per-visitor CATE for a binary (conversion_rate) metric."""
assert metric_mode.p0_surface is not None and metric_mode.p1_surface is not None, (
"_compute_binary_truth called with paired-form MetricMode missing p0/p1 surfaces"
)
n = len(features)
p0_values = _validate_surface_output(
metric_mode.p0_surface.fn(features), n, "p0", lower_bound=0.0, upper_bound=1.0
)
p1_values = _validate_surface_output(
metric_mode.p1_surface.fn(features), n, "p1", lower_bound=0.0, upper_bound=1.0
)
cate = p1_values - p0_values
effect = float(np.mean(cate))
return TruthResult(
cate_per_visitor=AlignedVisitorArray(values=cate, n_visitors=n),
effect=effect,
effect_components={"conv_effect": effect},
p0_values=p0_values,
p1_values=p1_values,
)
def _compute_cart_expected_revenue(
cart_config: CartRevenueConfig, severity_delta: np.ndarray
) -> np.ndarray:
"""Analytical expected cart revenue per converter given severity delta.
For each visitor *i* and category *j*:
purchase_prob_j(i) = sigmoid(logit(base_j) + severity_delta[i] × weight_j)
Expected revenue includes the empty-cart fallback (cheapest category forced):
E[rev | convert, i] = Σ_j(purchase_prob_j(i) × base_price_j)
+ P(empty_cart_i) × cheapest_price
Parameters
----------
cart_config:
Cart revenue model configuration.
severity_delta:
Per-visitor logit-additive shift. Shape ``(n,)``. Pass zeros for
the control arm.
Returns
-------
np.ndarray
Shape ``(n,)`` — expected revenue conditional on conversion.
"""
categories = cart_config.categories
base_probs = np.array([c.base_purchase_prob for c in categories], dtype=float)
base_prices = np.array([c.base_price for c in categories], dtype=float)
price_stds = np.array([c.price_std for c in categories], dtype=float)
weights = base_probs / base_probs.sum()
logit_base = np.log(base_probs / (1.0 - base_probs))
# Compute expected price per category under the truncated normal sampling
# distribution: X ~ N(mu, sigma) truncated below at a = mu / 2.
# E[X | X > a] = mu + sigma * phi(alpha) / (1 - Phi(alpha))
# where alpha = (a - mu) / sigma.
expected_prices = np.empty_like(base_prices)
for j in range(len(categories)):
if price_stds[j] == 0.0:
expected_prices[j] = base_prices[j]
else:
a = base_prices[j] / 2.0
alpha = (a - base_prices[j]) / price_stds[j]
expected_prices[j] = base_prices[j] + price_stds[j] * (
stats.norm.pdf(alpha) / (1.0 - stats.norm.cdf(alpha))
)
# (n_visitors, n_categories)
logit_probs = (
logit_base[np.newaxis, :]
+ severity_delta[:, np.newaxis] * weights[np.newaxis, :]
)
purchase_probs = 1.0 / (1.0 + np.exp(-logit_probs)) # sigmoid
# Expected revenue from each category using truncated-normal expected prices.
expected_per_category = purchase_probs * expected_prices[np.newaxis, :] # (n, n_cat)
# Empty-cart probability: prod(1 - purchase_prob_j).
empty_prob = np.prod(1.0 - purchase_probs, axis=1) # (n,)
# Empty-cart fallback uses the truncated mean of the cheapest category's price.
cheapest_idx = int(np.argmin(base_prices))
cheapest_expected_price = float(expected_prices[cheapest_idx])
return expected_per_category.sum(axis=1) + empty_prob * cheapest_expected_price
def _compute_hurdle_truth(
features: pd.DataFrame,
metric_mode: MetricMode,
revenue_model: CartRevenueConfig | None = None,
) -> TruthResult:
"""Compute per-visitor CATE and additive decomposition for a hurdle metric."""
assert metric_mode.p0_surface is not None and metric_mode.p1_surface is not None, (
"_compute_hurdle_truth called with paired-form MetricMode missing p0/p1 surfaces"
)
n = len(features)
p0 = _validate_surface_output(
metric_mode.p0_surface.fn(features), n, "p0", lower_bound=0.0, upper_bound=1.0
)
p1 = _validate_surface_output(
metric_mode.p1_surface.fn(features), n, "p1", lower_bound=0.0, upper_bound=1.0
)
if metric_mode.m0_surface is None or metric_mode.m1_surface is None:
raise ValueError(
"_compute_hurdle_truth: m0_surface and m1_surface must not be None"
)
m0 = _validate_surface_output(
metric_mode.m0_surface.fn(features), n, "m0", lower_bound=0.0, strict_lower=True
)
m1 = _validate_surface_output(
metric_mode.m1_surface.fn(features), n, "m1", lower_bound=0.0, strict_lower=True
)
if metric_mode.sigma0_surface is None or metric_mode.sigma1_surface is None:
raise ValueError(
"_compute_hurdle_truth: sigma0_surface and sigma1_surface must not be None"
)
sigma0 = _validate_surface_output(
metric_mode.sigma0_surface.fn(features), n, "sigma0", lower_bound=0.0, strict_lower=True
)
sigma1 = _validate_surface_output(
metric_mode.sigma1_surface.fn(features), n, "sigma1", lower_bound=0.0, strict_lower=True
)
# Effective "expected revenue | convert" values for the decomposition.
# For lognormal, the raw surface values (m0, m1) ARE the expected revenue.
# For cart, we compute analytical expected cart revenue per visitor.
if revenue_model is not None:
severity_delta_treat = m1 - m0
m0_eff = _compute_cart_expected_revenue(
revenue_model, np.zeros(n, dtype=float)
)
m1_eff = _compute_cart_expected_revenue(revenue_model, severity_delta_treat)
else:
m0_eff = m0
m1_eff = m1
cate = p1 * m1_eff - p0 * m0_eff
conv = (p1 - p0) * m0_eff
aov = p1 * (m1_eff - m0_eff)
# Per-visitor decomposition identity check (ValueError, not assert).
if not np.allclose(cate, conv + aov, atol=1e-10):
max_err = float(np.max(np.abs(cate - (conv + aov))))
raise ValueError(
f"Per-visitor decomposition identity violated: "
f"max |cate - (conv + aov)| = {max_err:.2e}"
)
effect = float(np.mean(cate))
return TruthResult(
cate_per_visitor=AlignedVisitorArray(values=cate, n_visitors=n),
effect=effect,
effect_components={
"conv_effect": float(np.mean(conv)),
"aov_effect": float(np.mean(aov)),
},
conv_per_visitor=AlignedVisitorArray(values=conv, n_visitors=n),
aov_per_visitor=AlignedVisitorArray(values=aov, n_visitors=n),
p0_values=p0,
p1_values=p1,
m0_values=m0,
m1_values=m1,
m0_effective=m0_eff,
m1_effective=m1_eff,
sigma0_values=sigma0,
sigma1_values=sigma1,
)
def _compute_truth_multi(
features: pd.DataFrame,
metric_mode: MetricMode,
revenue_model: CartRevenueConfig | None = None,
) -> TruthResult:
"""Compute truth for the multi-treatment (K≥3) form.
Evaluates all K per-treatment-level potential-outcome surfaces and derives
K−1 treatment-vs-control contrasts. Only the lognormal severity model is
supported; passing a non-None ``revenue_model`` raises ``NotImplementedError``.
Parameters
----------
features:
Feature matrix, one row per visitor.
metric_mode:
Multi-treatment MetricMode (``p_surfaces`` non-None, length K≥3).
revenue_model:
Must be ``None``; K≥3 cart revenue is out of scope.
Returns
-------
TruthResult
K≥3 form: ``p_values`` / ``m_values`` / ``sigma_values`` (length K);
``cate_per_visitor=None``; legacy paired fields all ``None``.
"""
if revenue_model is not None:
raise NotImplementedError(
"K>=3 cart revenue is out of scope; lognormal only"
)
assert metric_mode.p_surfaces is not None, (
"_compute_truth_multi called without p_surfaces"
)
n = len(features)
K = metric_mode.n_treatments
p_values: list[np.ndarray] = [
_validate_surface_output(
metric_mode.p_surfaces[k].fn(features),
n,
f"p{k}",
lower_bound=0.0,
upper_bound=1.0,
)
for k in range(K)
]
if metric_mode.metric_id == "revenue_per_visitor":
assert metric_mode.m_surfaces is not None and metric_mode.sigma_surfaces is not None
m_values: list[np.ndarray] | None = [
_validate_surface_output(
metric_mode.m_surfaces[k].fn(features),
n,
f"m{k}",
lower_bound=0.0,
strict_lower=True,
)
for k in range(K)
]
sigma_values: list[np.ndarray] | None = [
_validate_surface_output(
metric_mode.sigma_surfaces[k].fn(features),
n,
f"sigma{k}",
lower_bound=0.0,
strict_lower=True,
)
for k in range(K)
]
# K−1 per-treatment contrasts: treatment_j vs. control (index 0).
effect_components: dict[str, float] = {
f"treatment_{j + 1}_effect": float(
np.mean(p_values[j + 1] * m_values[j + 1] - p_values[0] * m_values[0])
)
for j in range(K - 1)
}
else:
# conversion_rate — no severity surfaces.
m_values = None
sigma_values = None
effect_components = {
f"treatment_{j + 1}_effect": float(
np.mean(p_values[j + 1] - p_values[0])
)
for j in range(K - 1)
}
# Provisional best-treatment scalar: max over the K−1 contrasts.
# This is a provisional decision-relevant summary; the K≥3 SBC operates
# per-contrast (Phase 12) rather than collapsing to one scalar.
effect = float(max(effect_components.values()))
return TruthResult(
cate_per_visitor=None,
effect=effect,
effect_components=effect_components,
p_values=p_values,
m_values=m_values,
sigma_values=sigma_values,
)
[docs]
def compute_potential_outcomes(
features: pd.DataFrame,
metric_mode: MetricMode,
revenue_model: CartRevenueConfig | None = None,
) -> TruthResult:
"""Compute per-visitor potential outcomes and derive truth arrays.
Dispatches based on ``metric_mode.n_treatments``:
- ``n_treatments == 2`` (paired form): routes to ``_compute_binary_truth``
or ``_compute_hurdle_truth`` exactly as before.
- ``n_treatments >= 3`` (multi-treatment form): routes to
``_compute_truth_multi``.
Parameters
----------
features:
Feature matrix, one row per visitor. Produced by ``sample_features``.
metric_mode:
Metric and surface definitions. Determines which computation path
to use (binary, hurdle, or multi-treatment).
revenue_model:
Cart revenue configuration. When provided, the hurdle decomposition
uses analytical expected cart revenue instead of the raw severity
surface values. Ignored for binary metrics. Not supported for K≥3.
Returns
-------
TruthResult
Per-visitor CATE, population effect, and decomposition components.
Raises
------
ValueError
If ``metric_mode.metric_id`` is not a supported metric.
NotImplementedError
If ``revenue_model`` is non-None and ``n_treatments >= 3``.
"""
if metric_mode.n_treatments >= 3:
return _compute_truth_multi(features, metric_mode, revenue_model)
if metric_mode.metric_id == "conversion_rate":
return _compute_binary_truth(features, metric_mode)
elif metric_mode.metric_id == "revenue_per_visitor":
return _compute_hurdle_truth(features, metric_mode, revenue_model)
else:
raise ValueError(
f"compute_potential_outcomes: unsupported metric_id "
f"{metric_mode.metric_id!r}"
)
# ---------------------------------------------------------------------------
# Metric-family mapping
# ---------------------------------------------------------------------------
_METRIC_FAMILY: dict[str, MetricFamily] = {
"conversion_rate": MetricFamily.BINARY,
"revenue_per_visitor": MetricFamily.HURDLE_REAL,
}
def _metric_to_family(metric_id: str) -> MetricFamily:
"""Map a canonical metric_id to its MetricFamily.
Raises
------
ValueError
If metric_id is not a known metric.
"""
try:
return _METRIC_FAMILY[metric_id]
except KeyError:
raise ValueError(
f"_metric_to_family: unknown metric_id {metric_id!r}. "
f"Known: {sorted(_METRIC_FAMILY)}"
) from None
# ---------------------------------------------------------------------------
# Truth construction
# ---------------------------------------------------------------------------
[docs]
def build_truth(truth_result: TruthResult, metric_mode: MetricMode) -> CalibrationTruth:
"""Construct a CalibrationTruth from computed potential outcomes.
Bridges the internal ``TruthResult`` (pipeline intermediate) to the
public ``CalibrationTruth`` contract.
**K-dispatch:**
K=2 (paired form): populates the legacy 1-D fields (``cate_per_visitor``,
``conv/aov_cate_per_visitor``, ``p0/p1/m0/m1_per_visitor``); leaves the
three new list fields (``contrast_cate_per_visitor``, ``p_per_visitor``,
``m_per_visitor``) as ``None``.
K≥3 (multi-treatment form): populates ``contrast_cate_per_visitor`` (length
K−1), ``p_per_visitor`` (length K), and ``m_per_visitor`` (length K, or
``None`` for ``conversion_rate``); leaves all legacy 1-D fields as ``None``.
Parameters
----------
truth_result:
Output of ``compute_potential_outcomes`` — holds per-visitor CATE,
population effect, and decomposition components.
metric_mode:
The metric configuration used to compute truth — provides
``metric_id`` for family derivation.
Returns
-------
CalibrationTruth
Typed, frozen ground truth.
"""
family = _metric_to_family(metric_mode.metric_id)
# Multi-treatment (K≥3) branch — truth_result.p_values is the signal.
if truth_result.p_values is not None:
p_vals = truth_result.p_values
m_vals = truth_result.m_values
K = len(p_vals)
n = len(p_vals[0])
# K−1 per-treatment contrasts vs. control.
if m_vals is not None:
contrast_cate_per_visitor = [
AlignedVisitorArray(
values=p_vals[j + 1] * m_vals[j + 1] - p_vals[0] * m_vals[0],
n_visitors=n,
)
for j in range(K - 1)
]
else:
# conversion_rate: contrast is p[j+1] - p[0].
contrast_cate_per_visitor = [
AlignedVisitorArray(
values=p_vals[j + 1] - p_vals[0],
n_visitors=n,
)
for j in range(K - 1)
]
p_per_visitor = [
AlignedVisitorArray(values=p_vals[k], n_visitors=n) for k in range(K)
]
m_per_visitor: list[AlignedVisitorArray] | None = (
[AlignedVisitorArray(values=m_vals[k], n_visitors=n) for k in range(K)]
if m_vals is not None
else None
)
return CalibrationTruth(
effect=truth_result.effect,
metric_id=metric_mode.metric_id,
metric_family=family,
effect_components=truth_result.effect_components,
cate_per_visitor=None,
contrast_cate_per_visitor=contrast_cate_per_visitor,
p_per_visitor=p_per_visitor,
m_per_visitor=m_per_visitor,
)
# K=2 (paired) branch — existing behavior unchanged.
assert truth_result.cate_per_visitor is not None, (
"build_truth: K=2 TruthResult must have cate_per_visitor non-None"
)
n = truth_result.cate_per_visitor.n_visitors
# Wire through raw surface values as AlignedVisitorArrays for hurdle metrics.
p0_per_visitor = (
AlignedVisitorArray(values=truth_result.p0_values, n_visitors=n)
if truth_result.p0_values is not None and metric_mode.metric_id == "revenue_per_visitor"
else None
)
p1_per_visitor = (
AlignedVisitorArray(values=truth_result.p1_values, n_visitors=n)
if truth_result.p1_values is not None and metric_mode.metric_id == "revenue_per_visitor"
else None
)
# Use effective m values (= analytical cart expected revenue when cart,
# = raw surface values when lognormal) for CalibrationTruth so downstream
# calibration compares against the correct ground truth.
m0_vals_k2 = truth_result.m0_effective if truth_result.m0_effective is not None else truth_result.m0_values
m1_vals_k2 = truth_result.m1_effective if truth_result.m1_effective is not None else truth_result.m1_values
m0_per_visitor_k2 = (
AlignedVisitorArray(values=m0_vals_k2, n_visitors=n)
if m0_vals_k2 is not None
else None
)
m1_per_visitor_k2 = (
AlignedVisitorArray(values=m1_vals_k2, n_visitors=n)
if m1_vals_k2 is not None
else None
)
return CalibrationTruth(
effect=truth_result.effect,
metric_id=metric_mode.metric_id,
metric_family=family,
effect_components=truth_result.effect_components,
cate_per_visitor=truth_result.cate_per_visitor,
conv_cate_per_visitor=truth_result.conv_per_visitor,
aov_cate_per_visitor=truth_result.aov_per_visitor,
p0_per_visitor=p0_per_visitor,
p1_per_visitor=p1_per_visitor,
m0_per_visitor=m0_per_visitor_k2,
m1_per_visitor=m1_per_visitor_k2,
)
# ---------------------------------------------------------------------------
# Composable surface helpers
# ---------------------------------------------------------------------------
[docs]
def add_surfaces(s1: SurfaceConfig, s2: SurfaceConfig) -> SurfaceConfig:
"""Return a surface whose output is the element-wise sum of two surfaces.
Parameters
----------
s1, s2:
Input surfaces. Both receive the same feature DataFrame ``X``.
Returns
-------
SurfaceConfig
Surface computing ``s1(X) + s2(X)``.
"""
return SurfaceConfig(fn=lambda X: s1.fn(X) + s2.fn(X))
[docs]
def multiply_surfaces(s1: SurfaceConfig, s2: SurfaceConfig) -> SurfaceConfig:
"""Return a surface whose output is the element-wise product of two surfaces.
Parameters
----------
s1, s2:
Input surfaces. Both receive the same feature DataFrame ``X``.
Returns
-------
SurfaceConfig
Surface computing ``s1(X) * s2(X)``.
"""
return SurfaceConfig(fn=lambda X: s1.fn(X) * s2.fn(X))
[docs]
def sigmoid_surface(s: SurfaceConfig) -> SurfaceConfig:
"""Return a surface applying the sigmoid transform to an inner surface.
The sigmoid maps any real value to the open interval (0, 1)::
sigmoid(z) = 1 / (1 + exp(-z))
Useful for composing linear or nonlinear terms into a valid probability
surface without manual clipping.
Parameters
----------
s:
Inner surface whose output is passed through sigmoid.
Returns
-------
SurfaceConfig
Surface computing ``sigmoid(s(X))``.
"""
return SurfaceConfig(fn=lambda X: 1.0 / (1.0 + np.exp(-s.fn(X))))
[docs]
def threshold_surface(
s: SurfaceConfig,
cutoff: float,
above: float,
below: float,
) -> SurfaceConfig:
"""Return a step-function surface based on a threshold applied to an inner surface.
Produces a discontinuous surface suitable for stress-testing HTE
estimators that assume smooth effect functions::
result[i] = above if s(X)[i] > cutoff
below otherwise
Parameters
----------
s:
Inner surface to threshold.
cutoff:
Decision boundary. Strict ``>`` comparison.
above:
Output value where ``s(X) > cutoff``.
below:
Output value where ``s(X) <= cutoff``.
Returns
-------
SurfaceConfig
Step-function surface.
"""
return SurfaceConfig(fn=lambda X: np.where(s.fn(X) > cutoff, above, below))
# ---------------------------------------------------------------------------
# Cart-based revenue sampler (Task 15.2)
# ---------------------------------------------------------------------------
def _sample_cart_revenue(
cart_config: CartRevenueConfig,
severity_delta: np.ndarray,
converter_mask: np.ndarray,
rng: np.random.Generator,
) -> np.ndarray:
"""Sample revenue for each visitor using the cart-based model.
For each converter, fires a Bernoulli for each product category. The
purchase probability for category ``j`` and visitor ``i`` is:
``purchase_prob_j(i) = sigmoid(logit(base_purchase_prob_j)
+ severity_delta[i] × weight_j)``
where ``weight_j = base_purchase_prob_j / sum(base_purchase_prob_k)``
distributes the scalar severity delta across categories proportionally
to their base purchase probability (design doc D9).
``severity_delta`` is the arm-specific severity delta:
- Control arm: ``np.zeros(n_control)`` (no treatment effect).
- Treatment arm: ``m1[treatment_indices] - m0[treatment_indices]``.
Revenue for each converter = sum of ``price_j ~ Normal(base_price_j, price_std_j)``
clipped to ``base_price_j / 2`` minimum, for all categories that fired.
Empty-cart fallback forces the cheapest category.
Non-converters always get zero revenue.
Parameters
----------
cart_config:
Cart revenue model configuration with category definitions.
severity_delta:
Per-visitor scalar severity delta (logit-additive shift on category
purchase probabilities). Control arm passes zeros; treatment arm
passes ``m1 - m0``. Shape: ``(n_arm,)``.
converter_mask:
Boolean mask of shape ``(n_arm,)`` — ``True`` for converters.
rng:
Caller-controlled RNG for reproducibility.
Returns
-------
np.ndarray
Shape ``(n_arm,)`` — revenue for each visitor. Non-converters have
zero revenue.
"""
categories = cart_config.categories
n_visitors = len(converter_mask)
revenue = np.zeros(n_visitors, dtype=float)
if not np.any(converter_mask):
return revenue
# Pre-compute category weights for distributing severity delta.
base_probs = np.array([c.base_purchase_prob for c in categories], dtype=float)
total_base_prob = base_probs.sum()
weights = base_probs / total_base_prob # shape: (n_categories,)
# Logit of base purchase probabilities.
logit_base = np.log(base_probs / (1.0 - base_probs)) # shape: (n_categories,)
converter_indices = np.where(converter_mask)[0]
n_converters = len(converter_indices)
# severity_delta for each converter: shape (n_converters,)
conv_delta = severity_delta[converter_indices]
# Purchase probabilities: shape (n_converters, n_categories)
# For each converter i and category j:
# logit_prob_j(i) = logit(base_j) + conv_delta[i] * weight_j
logit_probs = logit_base[np.newaxis, :] + conv_delta[:, np.newaxis] * weights[np.newaxis, :]
purchase_probs = 1.0 / (1.0 + np.exp(-logit_probs)) # sigmoid
# Fire Bernoulli for each (converter, category) pair.
# fired[i, j] = True if converter i buys category j.
fired = rng.random((n_converters, len(categories))) < purchase_probs
# Sample prices for each (converter, category) pair.
# Price ~ Normal(base_price_j, price_std_j), clipped to base_price_j / 2 minimum.
base_prices = np.array([c.base_price for c in categories], dtype=float)
price_stds = np.array([c.price_std for c in categories], dtype=float)
if np.any(price_stds > 0):
raw_prices = rng.normal(
loc=base_prices[np.newaxis, :],
scale=np.maximum(price_stds[np.newaxis, :], 1e-12),
size=(n_converters, len(categories)),
)
min_prices = base_prices / 2.0
prices = np.maximum(raw_prices, min_prices[np.newaxis, :])
else:
# All price_std == 0: deterministic prices.
prices = np.broadcast_to(base_prices[np.newaxis, :], (n_converters, len(categories))).copy()
# Revenue = sum of prices for fired categories.
conv_revenue = (fired * prices).sum(axis=1) # shape: (n_converters,)
# Empty-cart fallback: if no categories fired, force cheapest category.
empty_cart = ~np.any(fired, axis=1) # shape: (n_converters,)
if np.any(empty_cart):
cheapest_idx = int(np.argmin(base_prices))
cheapest_price = prices[np.where(empty_cart)[0], cheapest_idx]
conv_revenue[empty_cart] = cheapest_price
revenue[converter_indices] = conv_revenue
return revenue
# ---------------------------------------------------------------------------
# Assignment and observed outcome sampling (Task 7.1)
# ---------------------------------------------------------------------------
def _make_variant_from_observed(
name: str,
experiment_id: str,
features: pd.DataFrame,
converted: np.ndarray,
revenue: np.ndarray,
) -> VariantData:
"""Build a VariantData from raw outcome arrays and the feature DataFrame.
The visitor DataFrame includes all feature columns from ``features`` plus
the required VISITOR_SCHEMA columns. No truth-derived columns are included.
"""
n = len(converted)
converted_bool = converted.astype(bool)
data: dict[str, object] = {
"visitor_id": [f"{name}-v{i}" for i in range(n)],
"experiment_id": experiment_id,
"variant": name,
"converted": converted_bool,
"revenue": revenue.astype(float),
"orders_count": converted_bool.astype(np.int64),
"sessions_count": np.ones(n, dtype=np.int64),
}
# Append feature columns (no truth fields — features are observed).
for col in features.columns:
data[col] = features[col].values
visitors = pd.DataFrame(data)
return VariantData(
name=name,
visitors=visitors,
n_visitors=n,
n_conversions=int(converted_bool.sum()),
total_revenue=float(revenue.sum()),
)
[docs]
def assign_and_observe(
features: pd.DataFrame,
truth_result: TruthResult,
assignment: AssignmentConfig,
metric_id: str,
rng: np.random.Generator,
revenue_model: CartRevenueConfig | None = None,
treatment_assignment: np.ndarray | None = None,
) -> list[VariantData]:
"""Assign visitors to treatment levels and sample observed outcomes.
Dispatches to one of two paths based on the truth structure:
**Paired/binary path** (``treatment_assignment is None`` AND
``truth_result.p_values is None``): the existing K=2 binary/hurdle
sampling unchanged — RNG draws are byte-identical to the original
implementation. Returns a 2-element list ``[control, treatment]``.
**Multi-treatment path** (``truth_result.p_values is not None``, i.e. K≥3
truth): generalised to K treatment levels. Assignment is either:
- **Internal randomisation** (``treatment_assignment is None``): uses
``assignment.treatment_probabilities`` to draw per-visitor treatment
indices from a categorical distribution.
- **External hook** (``treatment_assignment`` supplied): the caller
provides a per-visitor array of treatment indices in ``[0, K−1]``
(e.g. a sequential-experiment cell's policy routing).
When ``revenue_model`` is a ``CartRevenueConfig``, cart-based revenue
sampling is used (K=2 only). Supplying a ``revenue_model`` for K≥3
raises ``NotImplementedError``.
Parameters
----------
features:
Feature matrix produced by ``sample_features``, one row per visitor.
truth_result:
Output of ``compute_potential_outcomes``. K=2: carries ``p0_values``
and ``p1_values`` (and hurdle arrays). K≥3: carries ``p_values``
(and ``m_values``/``sigma_values`` for hurdle metrics).
assignment:
Treatment assignment parameters. Paired path uses
``treatment_allocation``; multi-treatment internal path uses
``treatment_probabilities``.
metric_id:
Canonical metric identifier — determines revenue sampling path.
rng:
Caller-controlled RNG for reproducible assignment and sampling.
revenue_model:
Revenue sampling strategy for hurdle metrics. ``None`` (default)
uses LogNormal sampling. A ``CartRevenueConfig`` activates cart-based
sampling (K=2 only). Ignored for binary metrics.
treatment_assignment:
External per-visitor treatment-index array (ints in ``[0, K−1]``).
When supplied, bypasses internal randomisation and drives group
membership exactly — suitable for the sequential-experiment loop
where a cell's policy routes visitors by features. Requires
multi-treatment truth (``truth_result.p_values`` non-None); supplying
this with K=2 paired truth raises ``ValueError``.
Returns
-------
list[VariantData]
Length K. Index 0 = control (``name="control"``); indices 1…K−1 are
``name="treatment_k"`` for K≥3, or ``name="treatment"`` for K=2.
All VariantData contain observed columns and feature columns only
(no truth fields).
Raises
------
ValueError
If ``truth_result`` is missing required potential outcome arrays, if
``treatment_assignment`` is supplied with K=2 paired truth, or if
``treatment_assignment`` has invalid length or out-of-range indices.
NotImplementedError
If ``revenue_model`` is non-None and K≥3 truth is present.
"""
if treatment_assignment is None and truth_result.p_values is None:
return _assign_and_observe_paired(
features, truth_result, assignment, metric_id, rng, revenue_model
)
return _assign_and_observe_multi(
features,
truth_result,
assignment,
metric_id,
rng,
revenue_model,
treatment_assignment,
)
def _assign_and_observe_paired(
features: pd.DataFrame,
truth_result: TruthResult,
assignment: AssignmentConfig,
metric_id: str,
rng: np.random.Generator,
revenue_model: CartRevenueConfig | None,
) -> list[VariantData]:
"""K=2 paired/binary observation path.
The RNG call sequence is byte-identical to the original inline
implementation — the K=2 snapshot baselines depend on this stream.
"""
if truth_result.p0_values is None or truth_result.p1_values is None:
raise ValueError(
"assign_and_observe: truth_result must carry p0_values and p1_values. "
"Ensure compute_potential_outcomes populated these fields."
)
n = len(features)
p0 = truth_result.p0_values
p1 = truth_result.p1_values
# Step 1: assign each visitor to control (False) or treatment (True).
treatment_mask = rng.random(n) < assignment.treatment_allocation
control_mask = ~treatment_mask
control_indices = np.where(control_mask)[0]
treatment_indices = np.where(treatment_mask)[0]
# Step 2: sample observed conversions from per-visitor Bernoulli.
ctrl_converted = rng.random(len(control_indices)) < p0[control_indices]
treat_converted = rng.random(len(treatment_indices)) < p1[treatment_indices]
# Step 3: sample revenue — zero for binary, hurdle draw (lognormal or cart).
if metric_id == "revenue_per_visitor":
if truth_result.m0_values is None or truth_result.m1_values is None:
raise ValueError(
"assign_and_observe: hurdle metric requires m0_values and m1_values "
"in truth_result."
)
m0 = truth_result.m0_values
m1 = truth_result.m1_values
if isinstance(revenue_model, CartRevenueConfig):
# Cart-based revenue: distribute severity delta across categories.
# Control arm has zero severity delta (baseline purchase probabilities).
# Treatment arm delta is (m1 - m0) distributed across categories.
ctrl_severity_delta = np.zeros(len(control_indices), dtype=float)
treat_severity_delta = m1[treatment_indices] - m0[treatment_indices]
ctrl_revenue = _sample_cart_revenue(
cart_config=revenue_model,
severity_delta=ctrl_severity_delta,
converter_mask=ctrl_converted,
rng=rng,
)
treat_revenue = _sample_cart_revenue(
cart_config=revenue_model,
severity_delta=treat_severity_delta,
converter_mask=treat_converted,
rng=rng,
)
else:
# Default: LogNormal sampling for converters.
if truth_result.sigma0_values is None or truth_result.sigma1_values is None:
raise ValueError(
"assign_and_observe: hurdle metric with lognormal requires "
"sigma0_values and sigma1_values in truth_result."
)
sigma0 = truth_result.sigma0_values
sigma1 = truth_result.sigma1_values
# Control arm: LogNormal sampling for converters.
ctrl_sigma = sigma0[control_indices]
ctrl_mu = np.log(m0[control_indices]) - ctrl_sigma**2 / 2
ctrl_sampled = np.exp(rng.normal(ctrl_mu, ctrl_sigma))
ctrl_revenue = np.where(ctrl_converted, ctrl_sampled, 0.0)
if not np.all(np.isfinite(ctrl_revenue)):
raise ValueError(
"Sampled control revenue contains non-finite values (extreme sigma?)"
)
# Treatment arm: LogNormal sampling for converters.
treat_sigma = sigma1[treatment_indices]
treat_mu = np.log(m1[treatment_indices]) - treat_sigma**2 / 2
treat_sampled = np.exp(rng.normal(treat_mu, treat_sigma))
treat_revenue = np.where(treat_converted, treat_sampled, 0.0)
if not np.all(np.isfinite(treat_revenue)):
raise ValueError(
"Sampled treatment revenue contains non-finite values (extreme sigma?)"
)
elif metric_id == "conversion_rate":
ctrl_revenue = np.zeros(len(control_indices), dtype=float)
treat_revenue = np.zeros(len(treatment_indices), dtype=float)
else:
raise ValueError(
f"assign_and_observe: unsupported metric_id {metric_id!r}"
)
# Step 4: build VariantData for each arm.
# Use the experiment_id placeholder "sim-v2" — caller provides via config;
# the bundle assembly step sets the real experiment_id via build_bundle.
ctrl_features = features.iloc[control_indices].reset_index(drop=True)
treat_features = features.iloc[treatment_indices].reset_index(drop=True)
control = _make_variant_from_observed(
name="control",
experiment_id="sim-v2",
features=ctrl_features,
converted=ctrl_converted,
revenue=ctrl_revenue,
)
treatment = _make_variant_from_observed(
name="treatment",
experiment_id="sim-v2",
features=treat_features,
converted=treat_converted,
revenue=treat_revenue,
)
return [control, treatment]
def _assign_and_observe_multi(
features: pd.DataFrame,
truth_result: TruthResult,
assignment: AssignmentConfig,
metric_id: str,
rng: np.random.Generator,
revenue_model: CartRevenueConfig | None,
treatment_assignment: np.ndarray | None,
) -> list[VariantData]:
"""K>=3 (or externally-assigned) multi-treatment observation path."""
if treatment_assignment is not None and truth_result.p_values is None:
raise ValueError(
"assign_and_observe: external treatment_assignment requires "
"multi-treatment truth (p_values); the K=2 paired path uses AssignmentConfig"
)
assert truth_result.p_values is not None
K = len(truth_result.p_values)
n = len(features)
if revenue_model is not None:
raise NotImplementedError(
"assign_and_observe: K>=3 cart revenue is out of scope; lognormal only"
)
# Determine per-visitor treatment index Z (length n, int in [0, K−1]).
if treatment_assignment is not None:
if len(treatment_assignment) != n:
raise ValueError(
f"assign_and_observe: treatment_assignment length {len(treatment_assignment)} "
f"must equal n_visitors={n}"
)
if not np.all((np.asarray(treatment_assignment) >= 0) & (np.asarray(treatment_assignment) < K)):
raise ValueError(
f"assign_and_observe: treatment_assignment entries must be in [0, {K - 1}]"
)
Z = np.asarray(treatment_assignment)
else:
if assignment.treatment_probabilities is None:
raise ValueError(
"assign_and_observe: multi-treatment path requires "
"assignment.treatment_probabilities (not None)"
)
Z = rng.choice(K, size=n, p=np.array(assignment.treatment_probabilities))
# Build one VariantData per treatment level.
variants: list[VariantData] = []
for k in range(K):
idx = np.where(Z == k)[0]
n_k = len(idx)
p_k = truth_result.p_values[k]
conversions = rng.random(n_k) < p_k[idx]
if metric_id == "revenue_per_visitor":
assert truth_result.m_values is not None and truth_result.sigma_values is not None
m_k = truth_result.m_values[k]
sigma_k = truth_result.sigma_values[k]
m_idx = m_k[idx]
sigma_idx = sigma_k[idx]
mu_idx = np.log(m_idx) - sigma_idx**2 / 2
sampled = np.exp(rng.normal(mu_idx, sigma_idx))
revenue = np.where(conversions, sampled, 0.0)
if not np.all(np.isfinite(revenue)):
raise ValueError(
f"assign_and_observe: sampled revenue for level k={k} "
"contains non-finite values (extreme sigma?)"
)
else:
revenue = np.zeros(n_k, dtype=float)
name = "control" if k == 0 else f"treatment_{k}"
variant = _make_variant_from_observed(
name=name,
experiment_id="sim-v2",
features=features.iloc[idx].reset_index(drop=True),
converted=conversions,
revenue=revenue,
)
variants.append(variant)
return variants
# ---------------------------------------------------------------------------
# Bundle assembly (Task 7.2)
# ---------------------------------------------------------------------------
[docs]
def build_bundle(
variants: list[VariantData],
truth_result: TruthResult,
metric_mode: MetricMode,
experiment_id: str,
) -> CalibrationBundle:
"""Assemble a CalibrationBundle from variant list and truth.
Accepts a list of K VariantData (K=2 for paired designs, K≥3 for
multi-treatment). Stamps the canonical ``experiment_id`` onto each
variant, validates the observed data schema, and performs an alignment
check appropriate for the truth form:
- K=2 (``truth.cate_per_visitor`` non-None): standard CATE alignment.
- K≥3 (``truth.cate_per_visitor is None``): aligns ``truth.p_per_visitor[0]``
against the total observed visitor count.
Fail-closed: raises on any validation or alignment violation.
Parameters
----------
variants:
List of VariantData, one per treatment level. Index 0 = control,
indices 1…K−1 = treated levels.
truth_result:
Internal truth intermediate from ``compute_potential_outcomes``.
metric_mode:
Metric configuration — provides ``metric_id`` for family derivation.
experiment_id:
Canonical experiment identifier stamped onto observed data.
Returns
-------
CalibrationBundle
``(observed, truth)`` — observed is validated and truth-free.
Raises
------
SchemaViolation
If observed data violates the visitor schema contract.
AlignmentViolation
If the truth array length doesn't match total observed visitors.
ValueError
If alignment check fails at the n_visitors level.
"""
# Rebuild VariantData with the real experiment_id stamped on each row.
def _restamped(variant: VariantData, exp_id: str) -> VariantData:
df = variant.visitors.copy()
df["experiment_id"] = exp_id
return VariantData(
name=variant.name,
visitors=df,
n_visitors=variant.n_visitors,
n_conversions=variant.n_conversions,
total_revenue=variant.total_revenue,
)
stamped_variants = [_restamped(v, experiment_id) for v in variants]
observed = ObservedExperimentData(
experiment_id=experiment_id,
metric=metric_mode.metric_id,
variants=stamped_variants,
)
truth = build_truth(truth_result, metric_mode)
# Fail-closed validation.
validate_observed_data(observed, strict=True)
# Alignment check — dispatch on truth form.
if truth.cate_per_visitor is not None:
# K=2 paired path: standard CATE alignment.
validate_alignment(truth.cate_per_visitor, observed)
else:
# K≥3 multi-treatment path: p_per_visitor[0] carries the alignment
# signal (each array has length = total n_visitors).
assert truth.p_per_visitor is not None, (
"build_bundle: K>=3 truth must have p_per_visitor non-None"
)
validate_alignment(truth.p_per_visitor[0], observed)
return CalibrationBundle(observed=observed, truth=truth)
# ---------------------------------------------------------------------------
# Public entrypoint
# ---------------------------------------------------------------------------
[docs]
def generate_v2_core(config: V2GeneratorConfig) -> CalibrationBundle:
"""Generate a CalibrationBundle via the v2 potential-outcomes pipeline.
Pipeline stages:
1. sample_features — draw correlated mixed covariates X.
2. compute_potential_outcomes — derive per-visitor truth arrays.
3. assign_and_observe — assign treatment, sample observed outcomes.
4. build_bundle — assemble CalibrationBundle(observed, truth).
Parameters
----------
config:
Fully specified v2 generator configuration.
Returns
-------
CalibrationBundle
``(observed, truth)`` — observed is truth-free, truth contains
per-visitor CATE aligned with concatenated visitor rows.
"""
rng = np.random.default_rng(config.seed)
features = sample_features(config.feature_sampler, rng, config.n_visitors)
truth_result = compute_potential_outcomes(
features, config.metric_mode, revenue_model=config.revenue_model
)
# Strip cluster_id before output — surfaces used it for per-cluster dispatch,
# but it must not be visible to estimators (latent structure, not observed).
features = features.drop(columns=["cluster_id"], errors="ignore")
variants = assign_and_observe(
features,
truth_result,
config.assignment,
config.metric_mode.metric_id,
rng,
revenue_model=config.revenue_model,
)
return build_bundle(
variants=variants,
truth_result=truth_result,
metric_mode=config.metric_mode,
experiment_id=config.experiment_id,
)