Source code for pytyche.experiment.simulate

"""Sim-mode generator adapter — pytyche DGP templates as a ``Generator``.

:func:`simulated_experiment_generator` wraps a registered
``pytyche.generators`` scenario template as a callable conforming to the
:data:`~pytyche.experiment.sequential.Generator` contract: the loop calls
``generator(round_idx, plan)`` and receives ``(observed, truth)`` with
non-None truth. Visitors are allocated to the plan's cells by weight and
each visitor's treatment is drawn from the cell's ``policy.assign`` —
``TreePolicy`` routing happens at data-generation time, driving the
template's ``assign_and_observe`` through its external
``treatment_assignment`` hook (never the fixed ``treatment_probabilities``
path).
"""

from __future__ import annotations

import dataclasses
from collections.abc import Sequence
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd

from pytyche._internal.extraction import extract_fit_arrays
from pytyche.contracts import (
    RESERVED_CELL_COLUMN,
    RESERVED_PROPENSITY_PREFIX,
    AlignedVisitorArray,
    CalibrationTruth,
    ObservedExperimentData,
)
from pytyche.experiment.cells import (
    BaselinePolicy,
    Policy,
    TreePolicy,
    UniformPolicy,
)
from pytyche.generators.core import (
    assign_and_observe,
    build_bundle,
    compute_potential_outcomes,
    sample_features,
)
from pytyche.generators.scenarios import TEMPLATES

if TYPE_CHECKING:
    from pytyche.experiment.recommendation import NextRoundPlan
    from pytyche.experiment.sequential import Generator

__all__ = ["simulated_experiment_generator"]


[docs] def simulated_experiment_generator( *, template: str, metric: str, effect_scale: float, K: int, seed: int = 0, treatment_names: Sequence[str] | None = None, **template_kwargs: object, ) -> Generator: """Wrap a registered DGP template as a sim-mode ``Generator`` callable. The returned callable conforms to the single data-source contract (:data:`~pytyche.experiment.sequential.Generator`): invoked as ``generator(round_idx, plan)``, it synthesizes one round of observed data from the plan's cell structure and returns ``(ObservedExperimentData, CalibrationTruth)`` — truth is always non-None (sim mode). Generation is deterministic per ``(seed, round_idx)``: replaying a round with the same plan reproduces it exactly, while distinct rounds draw fresh visitors. Variant naming defaults to the generator core's: index 0 is ``"control"`` and indices 1..K-1 are ``"treatment_1"`` .. ``"treatment_{K-1}"``. ``treatment_names=`` renames the variants for the experiment's narrative (position 0 is the baseline; positions follow the template's arm order) — presentation-only, the positional truth arrays are untouched. A ``pt.sequential_experiment(...)`` driven by this adapter must pass the effective names as its ``treatments=`` universe, baseline first. Each returned visitors frame carries the reserved recording columns: - ``cell`` — the id of the plan cell that allocated the visitor (recorded at generation time; not derivable from the treatment received). - ``propensity_1`` .. ``propensity_{K-1}`` — the realized assignment propensities, i.e. the cell-weight blend ``P(Z = k | x, plan) = sum_c w_c * P_policy_c(k | x)``. Real-data generators carry the same recording obligations: the loop requires the ``cell`` column, and the reserved propensity columns are the documented channel for adaptively-assigned designs (``propensity`` at K = 2). The adapter is K >= 3 only: the generator core's external ``treatment_assignment`` hook exists only for multi-arm truth, and the K = 2 paired sampling path is pinned out of bounds. For a K = 2 sequential experiment, write a custom generator instead. Args: template: Name of a registered scenario template — a key of ``pytyche.generators.scenarios.TEMPLATES``. metric: Canonical metric identifier passed to the template (e.g. ``"revenue_per_visitor"``). effect_scale: Treatment-effect amplitude passed to the template. K: Number of treatment levels (control + K-1 treatments). Must be >= 3. seed: Master seed. Round ``r`` draws from ``np.random.default_rng(np.random.SeedSequence((seed, r)))``. treatment_names: Optional K unique variant names replacing the template's, in template arm order (position 0 = baseline). **template_kwargs: Extra keyword arguments forwarded to the template (e.g. ``n_nuisance=``, ``baseline_rate=``). Returns: A ``Generator``-conforming callable. Raises: ValueError: When ``template`` is not a registered template name, when ``K < 3``, or when ``treatment_names`` is not exactly K unique names. """ if template not in TEMPLATES: registered = ", ".join(sorted(TEMPLATES)) raise ValueError( f"Unknown template {template!r}. Registered templates: {registered}" ) if K < 3: raise ValueError( f"simulated_experiment_generator supports K >= 3 only; got K={K}. " "The generator core's external treatment_assignment hook exists " "only for multi-arm truth — for a K = 2 sequential experiment, " "write a custom generator instead." ) names = list(treatment_names) if treatment_names is not None else None if names is not None: if len(names) != K: raise ValueError( f"treatment_names must carry exactly K={K} names; " f"got {len(names)}" ) if len(set(names)) != K: raise ValueError("treatment_names must be unique") config, _metadata = TEMPLATES[template]( metric, effect_scale, K=K, seed=seed, **template_kwargs ) built_k = config.metric_mode.n_treatments if built_k != K: raise ValueError( f"template {template!r} did not honor K={K}: the built config " f"carries {built_k} treatment levels. Only templates with " "multi-arm support accept K (currently 'clustered_realistic')." ) def generator( round_idx: int, plan: NextRoundPlan ) -> tuple[ObservedExperimentData, CalibrationTruth | None]: n = plan.n_visitors if n is None: raise ValueError( "plan.n_visitors is None — the sim adapter needs the round's " "visitor count to generate data." ) rng = np.random.default_rng(np.random.SeedSequence((seed, round_idx))) features = sample_features(config.feature_sampler, rng, n) truth_result = compute_potential_outcomes( features, config.metric_mode, revenue_model=config.revenue_model ) # cluster_id is latent sampler structure, never observed (mirrors # generate_v2_core). features = features.drop(columns=["cluster_id"], errors="ignore") # Cell weights are contract-valid within 1e-6; rng.choice demands a # tighter sum, so normalize. weights = np.asarray([cell.weight for cell in plan.cells], dtype=float) cell_index = rng.choice( len(plan.cells), size=n, p=weights / weights.sum() ) encoded = _encode_features(features, plan) # Blend first: it consumes no rng and surfaces the curated # unmapped-leaf/custom-policy errors before any assignment draw. blend = np.zeros((n, K), dtype=np.float64) for cell in plan.cells: blend += cell.weight * _policy_probabilities(cell.policy, encoded, K) treatment_assignment = np.empty(n, dtype=np.int64) for i in range(n): policy = plan.cells[cell_index[i]].policy treatment_assignment[i] = policy.assign(encoded[i], rng) variants = assign_and_observe( features, truth_result, config.assignment, config.metric_mode.metric_id, rng, revenue_model=config.revenue_model, treatment_assignment=treatment_assignment, ) bundle = build_bundle( variants=variants, truth_result=truth_result, metric_mode=config.metric_mode, experiment_id=config.experiment_id, ) observed = bundle.observed if names is not None: renamed = [] for variant, new_name in zip( observed.variants, names, strict=True ): variant.visitors["variant"] = new_name renamed.append(dataclasses.replace(variant, name=new_name)) observed = dataclasses.replace(observed, variants=renamed) # assign_and_observe partitions visitors by arm, keeping each arm's # rows in original feature-frame order — the same index partition # aligns the recorded columns and re-orders the truth arrays from # generation order into the round's concat order (the Generator # contract: truth rides with its visitor's concatenated row). cell_ids = np.array([cell.id for cell in plan.cells], dtype=object) partition: list[np.ndarray] = [] for k, variant in enumerate(observed.variants): idx = np.flatnonzero(treatment_assignment == k) partition.append(idx) variant.visitors[RESERVED_CELL_COLUMN] = cell_ids[cell_index[idx]] for arm in range(1, K): variant.visitors[f"{RESERVED_PROPENSITY_PREFIX}{arm}"] = blend[ idx, arm ] concat_order = np.concatenate(partition) return observed, _permute_truth(bundle.truth, concat_order) return generator
def _permute_truth( truth: CalibrationTruth, concat_order: np.ndarray ) -> CalibrationTruth: """Re-order per-visitor truth arrays from generation to concat order. ``build_bundle`` emits truth aligned with the GENERATION-order feature frame, but ``assign_and_observe`` partitions the observed rows by arm — row ``j`` of the concatenated variants is generation row ``concat_order[j]``. The ``AlignedVisitorArray`` contract pairs ``values[j]`` with concatenated ``visitors.iloc[j]``, so every per-visitor array is gathered through ``concat_order``. Scalars are permutation-invariant and pass through. """ def gather( array: AlignedVisitorArray | None, ) -> AlignedVisitorArray | None: if array is None: return None return AlignedVisitorArray( values=np.asarray(array.values)[concat_order], n_visitors=array.n_visitors, ) def gather_list( arrays: list[AlignedVisitorArray] | None, ) -> list[AlignedVisitorArray] | None: if arrays is None: return None gathered = [gather(a) for a in arrays] assert all(a is not None for a in gathered) return [a for a in gathered if a is not None] return dataclasses.replace( truth, cate_per_visitor=gather(truth.cate_per_visitor), conv_cate_per_visitor=gather(truth.conv_cate_per_visitor), aov_cate_per_visitor=gather(truth.aov_cate_per_visitor), p0_per_visitor=gather(truth.p0_per_visitor), p1_per_visitor=gather(truth.p1_per_visitor), m0_per_visitor=gather(truth.m0_per_visitor), m1_per_visitor=gather(truth.m1_per_visitor), contrast_cate_per_visitor=gather_list(truth.contrast_cate_per_visitor), p_per_visitor=gather_list(truth.p_per_visitor), m_per_visitor=gather_list(truth.m_per_visitor), ) def _encode_features(features: pd.DataFrame, plan: NextRoundPlan) -> np.ndarray: """Encode a round's features into the float matrix policies route on. When any plan cell carries a ``TreePolicy``, the encoding must match the one the tree was trained on: column names come from ``extract_fit_arrays(plan.tree.observed)`` (the one encoding source of truth) and the fresh features are re-encoded with the same ``get_dummies`` recipe reindexed to those names (missing dummies → 0, novel categories dropped — unseen by the tree). Without tree cells the policies ignore features, so any deterministic float encoding serves. Raises: ValueError: When a ``TreePolicy`` cell is present but ``plan.tree`` is None — the routing encoding cannot be derived without the plan's tree. """ dummies = pd.get_dummies(features, drop_first=True).astype(float) if any(isinstance(cell.policy, TreePolicy) for cell in plan.cells): if plan.tree is None: raise ValueError( "plan carries TreePolicy cell(s) but plan.tree is None — the " "plan's tree (PolicyTreeResult) is required to derive the " "routing encoding." ) feature_names = extract_fit_arrays(plan.tree.observed).feature_names dummies = dummies.reindex(columns=list(feature_names), fill_value=0.0) else: dummies = dummies.reindex(columns=sorted(dummies.columns)) return dummies.to_numpy(dtype=np.float64) def _policy_probabilities( policy: Policy, encoded: np.ndarray, K: int ) -> np.ndarray: """Per-visitor assignment probabilities ``P_policy(k | x)``, shape (n, K). Index resolution mirrors each shipped policy's own ``assign``: ``BaselinePolicy`` is a point mass on index 0; ``UniformPolicy`` spreads ``1/len(over)`` over its over-set resolved against its ``treatments`` universe; ``TreePolicy`` looks up ``allocation_map[leaf(x)]`` against its universe (the leaf lookup is vectorized — arithmetic, not assignment). Raises: ValueError: For a custom policy type (the adapter cannot derive assignment probabilities it does not define), or when a row routes to a leaf the allocation map does not cover. """ n = len(encoded) probabilities = np.zeros((n, K), dtype=np.float64) if isinstance(policy, BaselinePolicy): probabilities[:, 0] = 1.0 elif isinstance(policy, UniformPolicy): universe = policy.treatments if policy.treatments is not None else policy.over for name in policy.over: probabilities[:, universe.index(name)] += 1.0 / len(policy.over) elif isinstance(policy, TreePolicy): leaves = policy.tree.apply(encoded) universe = policy._universe() unmapped = sorted(set(np.unique(leaves).tolist()) - set(policy.allocation_map)) if unmapped: raise ValueError( f"the round's features route to leaf id(s) {unmapped} that " "the TreePolicy allocation_map does not cover" ) for leaf_id, weights in policy.allocation_map.items(): mask = leaves == leaf_id if not mask.any(): continue row = np.zeros(K, dtype=np.float64) for name, weight in weights.items(): row[universe.index(name)] = weight probabilities[mask] = row else: raise ValueError( f"Cannot derive assignment propensities for custom policy type " f"{type(policy).__name__!r}; the sim adapter supports " "BaselinePolicy, UniformPolicy, and TreePolicy." ) return probabilities