"""Sim-mode generator adapter — pytyche DGP templates as a ``Generator``.
:func:`simulated_experiment_generator` wraps a registered
``pytyche.generators`` scenario template as a callable conforming to the
:data:`~pytyche.experiment.sequential.Generator` contract: the loop calls
``generator(round_idx, plan)`` and receives ``(observed, truth)`` with
non-None truth. Visitors are allocated to the plan's cells by weight and
each visitor's treatment is drawn from the cell's ``policy.assign`` —
``TreePolicy`` routing happens at data-generation time, driving the
template's ``assign_and_observe`` through its external
``treatment_assignment`` hook (never the fixed ``treatment_probabilities``
path).
"""
from __future__ import annotations
import dataclasses
from collections.abc import Sequence
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
from pytyche._internal.extraction import extract_fit_arrays
from pytyche.contracts import (
RESERVED_CELL_COLUMN,
RESERVED_PROPENSITY_PREFIX,
AlignedVisitorArray,
CalibrationTruth,
ObservedExperimentData,
)
from pytyche.experiment.cells import (
BaselinePolicy,
Policy,
TreePolicy,
UniformPolicy,
)
from pytyche.generators.core import (
assign_and_observe,
build_bundle,
compute_potential_outcomes,
sample_features,
)
from pytyche.generators.scenarios import TEMPLATES
if TYPE_CHECKING:
from pytyche.experiment.recommendation import NextRoundPlan
from pytyche.experiment.sequential import Generator
__all__ = ["simulated_experiment_generator"]
[docs]
def simulated_experiment_generator(
*,
template: str,
metric: str,
effect_scale: float,
K: int,
seed: int = 0,
treatment_names: Sequence[str] | None = None,
**template_kwargs: object,
) -> Generator:
"""Wrap a registered DGP template as a sim-mode ``Generator`` callable.
The returned callable conforms to the single data-source contract
(:data:`~pytyche.experiment.sequential.Generator`): invoked as
``generator(round_idx, plan)``, it synthesizes one round of observed
data from the plan's cell structure and returns
``(ObservedExperimentData, CalibrationTruth)`` — truth is always
non-None (sim mode). Generation is deterministic per
``(seed, round_idx)``: replaying a round with the same plan reproduces
it exactly, while distinct rounds draw fresh visitors.
Variant naming defaults to the generator core's: index 0 is
``"control"`` and indices 1..K-1 are ``"treatment_1"`` ..
``"treatment_{K-1}"``. ``treatment_names=`` renames the variants for
the experiment's narrative (position 0 is the baseline; positions
follow the template's arm order) — presentation-only, the positional
truth arrays are untouched. A ``pt.sequential_experiment(...)``
driven by this adapter must pass the effective names as its
``treatments=`` universe, baseline first.
Each returned visitors frame carries the reserved recording columns:
- ``cell`` — the id of the plan cell that allocated the visitor
(recorded at generation time; not derivable from the treatment
received).
- ``propensity_1`` .. ``propensity_{K-1}`` — the realized assignment
propensities, i.e. the cell-weight blend
``P(Z = k | x, plan) = sum_c w_c * P_policy_c(k | x)``.
Real-data generators carry the same recording obligations: the loop
requires the ``cell`` column, and the reserved propensity columns are
the documented channel for adaptively-assigned designs (``propensity``
at K = 2).
The adapter is K >= 3 only: the generator core's external
``treatment_assignment`` hook exists only for multi-arm truth, and the
K = 2 paired sampling path is pinned out of bounds. For a K = 2
sequential experiment, write a custom generator instead.
Args:
template: Name of a registered scenario template — a key of
``pytyche.generators.scenarios.TEMPLATES``.
metric: Canonical metric identifier passed to the template (e.g.
``"revenue_per_visitor"``).
effect_scale: Treatment-effect amplitude passed to the template.
K: Number of treatment levels (control + K-1 treatments). Must be
>= 3.
seed: Master seed. Round ``r`` draws from
``np.random.default_rng(np.random.SeedSequence((seed, r)))``.
treatment_names: Optional K unique variant names replacing the
template's, in template arm order (position 0 = baseline).
**template_kwargs: Extra keyword arguments forwarded to the
template (e.g. ``n_nuisance=``, ``baseline_rate=``).
Returns:
A ``Generator``-conforming callable.
Raises:
ValueError: When ``template`` is not a registered template name,
when ``K < 3``, or when ``treatment_names`` is not exactly K
unique names.
"""
if template not in TEMPLATES:
registered = ", ".join(sorted(TEMPLATES))
raise ValueError(
f"Unknown template {template!r}. Registered templates: {registered}"
)
if K < 3:
raise ValueError(
f"simulated_experiment_generator supports K >= 3 only; got K={K}. "
"The generator core's external treatment_assignment hook exists "
"only for multi-arm truth — for a K = 2 sequential experiment, "
"write a custom generator instead."
)
names = list(treatment_names) if treatment_names is not None else None
if names is not None:
if len(names) != K:
raise ValueError(
f"treatment_names must carry exactly K={K} names; "
f"got {len(names)}"
)
if len(set(names)) != K:
raise ValueError("treatment_names must be unique")
config, _metadata = TEMPLATES[template](
metric, effect_scale, K=K, seed=seed, **template_kwargs
)
built_k = config.metric_mode.n_treatments
if built_k != K:
raise ValueError(
f"template {template!r} did not honor K={K}: the built config "
f"carries {built_k} treatment levels. Only templates with "
"multi-arm support accept K (currently 'clustered_realistic')."
)
def generator(
round_idx: int, plan: NextRoundPlan
) -> tuple[ObservedExperimentData, CalibrationTruth | None]:
n = plan.n_visitors
if n is None:
raise ValueError(
"plan.n_visitors is None — the sim adapter needs the round's "
"visitor count to generate data."
)
rng = np.random.default_rng(np.random.SeedSequence((seed, round_idx)))
features = sample_features(config.feature_sampler, rng, n)
truth_result = compute_potential_outcomes(
features, config.metric_mode, revenue_model=config.revenue_model
)
# cluster_id is latent sampler structure, never observed (mirrors
# generate_v2_core).
features = features.drop(columns=["cluster_id"], errors="ignore")
# Cell weights are contract-valid within 1e-6; rng.choice demands a
# tighter sum, so normalize.
weights = np.asarray([cell.weight for cell in plan.cells], dtype=float)
cell_index = rng.choice(
len(plan.cells), size=n, p=weights / weights.sum()
)
encoded = _encode_features(features, plan)
# Blend first: it consumes no rng and surfaces the curated
# unmapped-leaf/custom-policy errors before any assignment draw.
blend = np.zeros((n, K), dtype=np.float64)
for cell in plan.cells:
blend += cell.weight * _policy_probabilities(cell.policy, encoded, K)
treatment_assignment = np.empty(n, dtype=np.int64)
for i in range(n):
policy = plan.cells[cell_index[i]].policy
treatment_assignment[i] = policy.assign(encoded[i], rng)
variants = assign_and_observe(
features,
truth_result,
config.assignment,
config.metric_mode.metric_id,
rng,
revenue_model=config.revenue_model,
treatment_assignment=treatment_assignment,
)
bundle = build_bundle(
variants=variants,
truth_result=truth_result,
metric_mode=config.metric_mode,
experiment_id=config.experiment_id,
)
observed = bundle.observed
if names is not None:
renamed = []
for variant, new_name in zip(
observed.variants, names, strict=True
):
variant.visitors["variant"] = new_name
renamed.append(dataclasses.replace(variant, name=new_name))
observed = dataclasses.replace(observed, variants=renamed)
# assign_and_observe partitions visitors by arm, keeping each arm's
# rows in original feature-frame order — the same index partition
# aligns the recorded columns and re-orders the truth arrays from
# generation order into the round's concat order (the Generator
# contract: truth rides with its visitor's concatenated row).
cell_ids = np.array([cell.id for cell in plan.cells], dtype=object)
partition: list[np.ndarray] = []
for k, variant in enumerate(observed.variants):
idx = np.flatnonzero(treatment_assignment == k)
partition.append(idx)
variant.visitors[RESERVED_CELL_COLUMN] = cell_ids[cell_index[idx]]
for arm in range(1, K):
variant.visitors[f"{RESERVED_PROPENSITY_PREFIX}{arm}"] = blend[
idx, arm
]
concat_order = np.concatenate(partition)
return observed, _permute_truth(bundle.truth, concat_order)
return generator
def _permute_truth(
truth: CalibrationTruth, concat_order: np.ndarray
) -> CalibrationTruth:
"""Re-order per-visitor truth arrays from generation to concat order.
``build_bundle`` emits truth aligned with the GENERATION-order feature
frame, but ``assign_and_observe`` partitions the observed rows by arm
— row ``j`` of the concatenated variants is generation row
``concat_order[j]``. The ``AlignedVisitorArray`` contract pairs
``values[j]`` with concatenated ``visitors.iloc[j]``, so every
per-visitor array is gathered through ``concat_order``. Scalars are
permutation-invariant and pass through.
"""
def gather(
array: AlignedVisitorArray | None,
) -> AlignedVisitorArray | None:
if array is None:
return None
return AlignedVisitorArray(
values=np.asarray(array.values)[concat_order],
n_visitors=array.n_visitors,
)
def gather_list(
arrays: list[AlignedVisitorArray] | None,
) -> list[AlignedVisitorArray] | None:
if arrays is None:
return None
gathered = [gather(a) for a in arrays]
assert all(a is not None for a in gathered)
return [a for a in gathered if a is not None]
return dataclasses.replace(
truth,
cate_per_visitor=gather(truth.cate_per_visitor),
conv_cate_per_visitor=gather(truth.conv_cate_per_visitor),
aov_cate_per_visitor=gather(truth.aov_cate_per_visitor),
p0_per_visitor=gather(truth.p0_per_visitor),
p1_per_visitor=gather(truth.p1_per_visitor),
m0_per_visitor=gather(truth.m0_per_visitor),
m1_per_visitor=gather(truth.m1_per_visitor),
contrast_cate_per_visitor=gather_list(truth.contrast_cate_per_visitor),
p_per_visitor=gather_list(truth.p_per_visitor),
m_per_visitor=gather_list(truth.m_per_visitor),
)
def _encode_features(features: pd.DataFrame, plan: NextRoundPlan) -> np.ndarray:
"""Encode a round's features into the float matrix policies route on.
When any plan cell carries a ``TreePolicy``, the encoding must match
the one the tree was trained on: column names come from
``extract_fit_arrays(plan.tree.observed)`` (the one encoding source of
truth) and the fresh features are re-encoded with the same
``get_dummies`` recipe reindexed to those names (missing dummies → 0,
novel categories dropped — unseen by the tree). Without tree cells the
policies ignore features, so any deterministic float encoding serves.
Raises:
ValueError: When a ``TreePolicy`` cell is present but ``plan.tree``
is None — the routing encoding cannot be derived without the
plan's tree.
"""
dummies = pd.get_dummies(features, drop_first=True).astype(float)
if any(isinstance(cell.policy, TreePolicy) for cell in plan.cells):
if plan.tree is None:
raise ValueError(
"plan carries TreePolicy cell(s) but plan.tree is None — the "
"plan's tree (PolicyTreeResult) is required to derive the "
"routing encoding."
)
feature_names = extract_fit_arrays(plan.tree.observed).feature_names
dummies = dummies.reindex(columns=list(feature_names), fill_value=0.0)
else:
dummies = dummies.reindex(columns=sorted(dummies.columns))
return dummies.to_numpy(dtype=np.float64)
def _policy_probabilities(
policy: Policy, encoded: np.ndarray, K: int
) -> np.ndarray:
"""Per-visitor assignment probabilities ``P_policy(k | x)``, shape (n, K).
Index resolution mirrors each shipped policy's own ``assign``:
``BaselinePolicy`` is a point mass on index 0; ``UniformPolicy``
spreads ``1/len(over)`` over its over-set resolved against its
``treatments`` universe; ``TreePolicy`` looks up
``allocation_map[leaf(x)]`` against its universe (the leaf lookup is
vectorized — arithmetic, not assignment).
Raises:
ValueError: For a custom policy type (the adapter cannot derive
assignment probabilities it does not define), or when a row
routes to a leaf the allocation map does not cover.
"""
n = len(encoded)
probabilities = np.zeros((n, K), dtype=np.float64)
if isinstance(policy, BaselinePolicy):
probabilities[:, 0] = 1.0
elif isinstance(policy, UniformPolicy):
universe = policy.treatments if policy.treatments is not None else policy.over
for name in policy.over:
probabilities[:, universe.index(name)] += 1.0 / len(policy.over)
elif isinstance(policy, TreePolicy):
leaves = policy.tree.apply(encoded)
universe = policy._universe()
unmapped = sorted(set(np.unique(leaves).tolist()) - set(policy.allocation_map))
if unmapped:
raise ValueError(
f"the round's features route to leaf id(s) {unmapped} that "
"the TreePolicy allocation_map does not cover"
)
for leaf_id, weights in policy.allocation_map.items():
mask = leaves == leaf_id
if not mask.any():
continue
row = np.zeros(K, dtype=np.float64)
for name, weight in weights.items():
row[universe.index(name)] = weight
probabilities[mask] = row
else:
raise ValueError(
f"Cannot derive assignment propensities for custom policy type "
f"{type(policy).__name__!r}; the sim adapter supports "
"BaselinePolicy, UniformPolicy, and TreePolicy."
)
return probabilities