Source code for pytyche.experiment.sequential

"""The ``SequentialExperiment`` state machine — the L1 round-by-round loop.

Each ``advance()`` (equivalently ``next(exp)``) runs one round: size the
round from the schedule, hand the current
:class:`~pytyche.experiment.recommendation.NextRoundPlan` to the
generator, fit the joint hurdle BCF on the CUMULATIVE data, analyze,
score the round's cells, ask the recommendation engine for the next
plan, and (sim mode) evaluate against the cumulative ground truth. All
state commits atomically at the end of the round — a failed step leaves
``history`` untouched.

The cumulative row order is variant-major: all rounds of the first-seen
variant, then all rounds of the next. Per-round artifacts aligned to a
round's own concatenation (truth arrays, the ``cell`` column) are
re-indexed into that order via the position map ``_build_cumulative``
returns.
"""

from __future__ import annotations

import dataclasses
import sys
import warnings
from collections.abc import Callable
from typing import TYPE_CHECKING, Literal

import numpy as np
import pandas as pd

from pytyche.analysis import analyze, evaluate_against_truth
from pytyche.bcf.config import HurdleBCFResult
from pytyche.bcf.hurdle import fit_hurdle_bcf
from pytyche.contracts import (
    RESERVED_CELL_COLUMN,
    AlignedVisitorArray,
    CalibrationTruth,
    ObservedExperimentData,
    VariantData,
)
from pytyche.experiment.cells import Cell, validate_cell_weights
from pytyche.experiment.experiment import (
    Experiment,
    compute_cell_observations,
)
from pytyche.experiment.recommendation import (
    GraduationCandidate,
    GraduationRule,
    NextRoundPlan,
    cold_start_plan,
    next_fit_num_trees_tau,
    recommend,
)

if TYPE_CHECKING:
    from collections.abc import Sequence

    from pytyche.analysis import TruthComparison
    from pytyche.calibrate.artifact import Calibration
    from pytyche.contracts import AnalysisResult
    from pytyche.experiment.schedule import Schedule

__all__ = [
    "Generator",
    "SequentialExperiment",
    "UncalibratedWarning",
    "sequential_experiment",
]

#: The single data-source contract. The loop invokes
#: ``generator(round_idx, plan)`` at the start of each round and receives
#: ``(observed, truth)`` — ``truth`` is non-None in sim mode and ``None``
#: in real-data mode (the mode may not flip between rounds). Every
#: returned visitors frame must carry the reserved ``cell`` column
#: recording each visitor's allocated cell id, and generators assigning
#: adaptively record per-visitor propensities in the reserved propensity
#: columns (``propensity`` at K = 2, ``propensity_1..propensity_{K-1}``
#: at K >= 3).
Generator = Callable[
    [int, NextRoundPlan],
    tuple[ObservedExperimentData, CalibrationTruth | None],
]

#: Cell-assignment sentinel for prior rounds' rows — matches no plan cell,
#: so per-cell observations cover the current round's members only.
_PAST_ROUNDS_SENTINEL = "__past__"


[docs] class UncalibratedWarning(UserWarning): """Emitted once per :class:`SequentialExperiment` running uncalibrated. Fires on the first ``advance()`` when the instance was constructed with ``calibration=None``. Supply an SBC-fitted :class:`~pytyche.calibrate.artifact.Calibration` artifact via the ``calibration=`` constructor parameter to correct interval coverage and silence the warning. """
[docs] class SequentialExperiment: """Stateful round-by-round adaptive experiment loop. The instance is its own iterator: ``next(exp)`` (or ``exp.advance()``) runs one round and returns the resulting :class:`~pytyche.experiment.experiment.Experiment` snapshot; iteration raises ``StopIteration`` when the schedule is exhausted. Configuration is locked at construction; per-round cell overrides go through ``advance(cells=...)``. Scope: this surface assumes designed experiments — the operator emits the assignment rules (the plan's cells and policies), the platform assigns per those rules, and per-visitor propensities are known and recorded. Heavily-confounded observational inference is out of scope for this API; for that setting, consider econml or DoubleML. Args: generator: ``generator(round_idx, plan)`` returning ``(observed, truth | None)`` — the single data-source contract. Sim-mode generators return ground truth; real-data generators return ``None`` (the mode may not flip between rounds). Every visitors frame must carry the reserved ``cell`` column recording each visitor's allocated cell id. schedule: Per-round visitor counts (:class:`~pytyche.experiment.schedule.Schedule` protocol). treatments: The full treatments universe, control first. Drops are recomputed from allocation history each round; dropped names never leave this universe. cells: Optional round-0 cell-structure override replacing the cold-start Control/Explore 50/50 split. min_control_weight: Guaranteed Control-cell share on engine-proposed plans. min_explore_weight: Guaranteed Explore-cell share on engine-proposed plans. max_segment_depth: Policy-tree depth bound for the per-round analysis and recommendation. min_segment_share: Minimum per-leaf population share. calibration: SBC-fitted artifact applied on-path each round, or ``None`` (uncalibrated; warns once per instance). graduation_rule: ``None`` uses :class:`~pytyche.experiment.recommendation.ExpectedLossRule` at its defaults. seed: Master seed — per-round fit seeds derive from it, and it seeds the policy tree's stability bootstrap. progress: When True, each round's fit renders tqdm progress bars on stderr (passed through as ``fit_hurdle_bcf(progress=True)``). Default False — silent. Raises: ValueError: When ``min_control_weight + min_explore_weight`` is not below 1.0 (no room for the Optimized cell). """ def __init__( self, *, generator: Generator, schedule: Schedule, treatments: Sequence[str], cells: list[Cell] | None = None, min_control_weight: float = 0.05, min_explore_weight: float = 0.05, max_segment_depth: int = 3, min_segment_share: float = 0.10, calibration: Calibration | None = None, graduation_rule: GraduationRule | None = None, seed: int = 0, progress: bool = False, ) -> None: if min_control_weight + min_explore_weight >= 1.0: raise ValueError( f"min_control_weight + min_explore_weight must be < 1.0 to " f"leave room for the Optimized cell; got {min_control_weight}" f" + {min_explore_weight} = " f"{min_control_weight + min_explore_weight}" ) if cells is not None: validate_cell_weights(cells) self._generator = generator self._schedule = schedule self._treatments = list(treatments) self._cells = list(cells) if cells is not None else None self._min_control_weight = min_control_weight self._min_explore_weight = min_explore_weight self._max_segment_depth = max_segment_depth self._min_segment_share = min_segment_share self._calibration = calibration self._graduation_rule = graduation_rule self._seed = seed self._progress = progress self._history: list[Experiment] = [] self._round_data: list[ObservedExperimentData] = [] self._round_truths: list[CalibrationTruth | None] = [] self._current_plan: NextRoundPlan | None = None self._warned_uncalibrated = False # ------------------------------------------------------------------ # Iteration # ------------------------------------------------------------------ def __iter__(self) -> SequentialExperiment: return self def __next__(self) -> Experiment: return self.advance()
[docs] def advance(self, cells: list[Cell] | None = None) -> Experiment: """Run one round and return its :class:`~pytyche.experiment.experiment.Experiment` snapshot. ``cells`` overrides this round's cell structure only; subsequent rounds revert to the recommendation engine's proposals. Overrides must satisfy the weight-sum invariant but are NOT subject to the engine's min-weight floors. All state commits only after every step succeeds — a failure at any step (missing ``cell`` column, calibration regime mismatch, ...) leaves ``history`` and the cumulative data unchanged. Raises: StopIteration: When the schedule is exhausted. ValueError: When the override cells' weights do not sum to 1.0, when round data lacks the reserved ``cell`` column or labels visitors with ids outside the round's plan, when a variant name falls outside the treatments universe, or when the round's identity (``experiment_id``, ``metric``, sim/real mode) conflicts with prior rounds. """ round_idx = len(self._history) size = self._schedule.next_round_size(round_idx) if size is None: raise StopIteration if self._current_plan is None: plan = cold_start_plan(list(self._treatments), n_visitors=size) if self._cells is not None: plan = dataclasses.replace(plan, cells=list(self._cells)) else: plan = self._current_plan if cells is not None: plan = dataclasses.replace(plan, cells=list(cells)) if any(c.id == _PAST_ROUNDS_SENTINEL for c in plan.cells): raise ValueError( f"cell id {_PAST_ROUNDS_SENTINEL!r} is reserved for the " "loop's prior-round bookkeeping and cannot name a plan cell" ) data, truth = self._generator(round_idx, plan) self._validate_round(data, truth) if self._calibration is None and not self._warned_uncalibrated: self._warned_uncalibrated = True # stacklevel=3 attributes past advance() AND __next__ at the # user's `next(exp)` line — the iteration path is the # documented surface, and library paths must not leak into # rendered notebook output. Direct advance() callers get # one frame higher; same compromise as NoCudaWarning. warnings.warn( "Running uncalibrated BCF posteriors (calibration=None); " "interval coverage may be miscalibrated at scale. Supply an " "SBC-fitted artifact via sequential_experiment(" "calibration=...) to correct it.", UncalibratedWarning, stacklevel=3, ) cumulative, positions = _build_cumulative([*self._round_data, data]) n_total = sum(v.n_visitors for v in cumulative.variants) analysis = self._fit_round(cumulative, round_idx) posterior = analysis.posterior assert isinstance(posterior, HurdleBCFResult) # the loop fits hurdle assignment = np.full(n_total, _PAST_ROUNDS_SENTINEL, dtype=object) current_cells = pd.concat( [v.visitors[RESERVED_CELL_COLUMN] for v in data.variants], ignore_index=True, ).to_numpy() unknown = sorted(set(current_cells) - {c.id for c in plan.cells}) if unknown: raise ValueError( f"round {round_idx} {RESERVED_CELL_COLUMN!r} column carries " f"ids {unknown} that match no cell in the round's plan " f"{sorted(c.id for c in plan.cells)}; every visitor must be " "labeled with one of the plan's cell ids" ) assignment[positions[-1]] = current_cells cell_observations = compute_cell_observations( posterior, plan.cells, assignment ) next_plan = recommend( analysis, history=list(self._history), treatments=list(self._treatments), schedule=self._schedule, min_control_weight=self._min_control_weight, min_explore_weight=self._min_explore_weight, max_segment_depth=self._max_segment_depth, min_segment_share=self._min_segment_share, graduation_rule=self._graduation_rule, seed=self._seed, ) truth_comparison: TruthComparison | None if truth is None: truth_comparison = None else: sim_truths = [ t for t in [*self._round_truths, truth] if t is not None ] cumulative_truth = _cumulative_truth( sim_truths, positions, n_total=n_total ) assert next_plan.tree is not None # engine plans carry the fit truth_comparison = evaluate_against_truth( posterior, next_plan.tree, cumulative_truth ) experiment = Experiment( round_idx=round_idx, posterior=posterior, analysis=analysis, cells_shipped=list(plan.cells), cell_observations=cell_observations, next_recommendation=next_plan, truth_comparison=truth_comparison, ) self._round_data.append(data) self._round_truths.append(truth) self._history.append(experiment) self._current_plan = next_plan return experiment
[docs] def run_to_completion(self) -> SequentialExperiment: """Advance until the schedule is exhausted or a candidate emerges. Returns ``self`` for chaining. Raises: ValueError: When the schedule is open-ended (``n_rounds`` is ``None``) — the operator must provide a stop condition. """ if self._schedule.n_rounds is None: raise ValueError( "run_to_completion requires a bounded schedule, but " "schedule.n_rounds is None (open-ended); advance manually " "or supply a schedule with n_rounds set" ) while not self.has_graduation_candidate(): try: self.advance() except StopIteration: break return self
# ------------------------------------------------------------------ # Per-round fit (the documented test seam) # ------------------------------------------------------------------ def _fit_round( self, cumulative: ObservedExperimentData, round_idx: int ) -> AnalysisResult: """Fit the cumulative data and analyze it — the loop's fit seam. The fit seed derives deterministically from the constructor seed and the round index via ``np.random.SeedSequence((seed, round_idx))`` — distinct rounds get decorrelated MCMC streams while the whole series stays reproducible from the one constructor seed. Round 0 fits at the config-default tau-forest size; later rounds size ``num_trees_tau`` from the previous round's evidence via :func:`~pytyche.experiment.recommendation.next_fit_num_trees_tau`. """ fit_seed = int( np.random.SeedSequence((self._seed, round_idx)).generate_state(1)[0] ) if self._progress: total = sum(v.n_visitors for v in cumulative.variants) print( f"round {round_idx}: fitting {total:,} cumulative visitors", file=sys.stderr, flush=True, ) if round_idx > 0: prev = self._history[-1] prev_plan = prev.next_recommendation assert prev_plan is not None and prev_plan.tree is not None assert isinstance(prev.posterior, HurdleBCFResult) cumulative_n = sum(v.n_visitors for v in cumulative.variants) posterior = fit_hurdle_bcf( cumulative, calibration=self._calibration, seed=fit_seed, progress=self._progress, num_trees_tau=next_fit_num_trees_tau( prev.posterior, prev_plan.tree, cumulative_n ), ) else: posterior = fit_hurdle_bcf( cumulative, calibration=self._calibration, seed=fit_seed, progress=self._progress, ) return analyze( posterior, max_depth=self._max_segment_depth, min_segment_share=self._min_segment_share, bootstrap_seed=self._seed, ) # ------------------------------------------------------------------ # Round-data validation # ------------------------------------------------------------------ def _validate_round( self, data: ObservedExperimentData, truth: CalibrationTruth | None ) -> None: """Reject round data that breaks the loop's cross-round contract.""" for variant in data.variants: if RESERVED_CELL_COLUMN not in variant.visitors.columns: raise ValueError( f"round {len(self._history)} data for variant " f"{variant.name!r} lacks the reserved " f"{RESERVED_CELL_COLUMN!r} column; generators must " "record each visitor's allocated cell id at generation " "time (it is not derivable from the treatment received)" ) rogue = [ v.name for v in data.variants if v.name not in self._treatments ] if rogue: raise ValueError( f"round {len(self._history)} data carries variant names " f"{rogue} outside the treatments universe " f"{self._treatments}" ) if not self._round_data: return first = self._round_data[0] if data.experiment_id != first.experiment_id: raise ValueError( f"round data experiment_id {data.experiment_id!r} does not " f"match the experiment's {first.experiment_id!r}" ) if data.metric != first.metric: raise ValueError( f"round data metric {data.metric!r} does not match the " f"experiment's {first.metric!r}" ) prior_sim = self._round_truths[0] is not None if (truth is not None) != prior_sim: raise ValueError( "sim/real mode may not flip between rounds: prior rounds " f"returned truth {'set' if prior_sim else 'None'} but this " f"round returned truth " f"{'set' if truth is not None else 'None'}" ) # ------------------------------------------------------------------ # History + state accessors # ------------------------------------------------------------------ @property def history(self) -> list[Experiment]: """Completed rounds, oldest first (a defensive copy).""" return list(self._history) @property def latest(self) -> Experiment | None: """The most recent round, or ``None`` before the first.""" return self._history[-1] if self._history else None @property def current_round_idx(self) -> int: """Number of completed rounds (the next round's index).""" return len(self._history) @property def next_recommendation(self) -> NextRoundPlan | None: """The engine's plan for the next round; ``None`` before round 0.""" if not self._history: return None return self._history[-1].next_recommendation # ------------------------------------------------------------------ # Capability methods (read-only) # ------------------------------------------------------------------
[docs] def graduation_candidates( self, sustained_rounds: int = 2 ) -> list[GraduationCandidate]: """Latest-round candidates sustained for at least *sustained_rounds*.""" plan = self.next_recommendation if plan is None: return [] return [ candidate for candidate in plan.graduation_candidates if candidate.sustained_rounds >= sustained_rounds ]
[docs] def has_graduation_candidate(self) -> bool: """Whether any candidate meets the default sustained-rounds bar.""" return bool(self.graduation_candidates())
[docs] def has_credible_segments(self, stability_threshold: float = 0.80) -> bool: """Whether any latest-round segment clears *stability_threshold*.""" if not self._history: return False return any( segment.stability_score >= stability_threshold for segment in self._history[-1].analysis.segments )
[docs] def dropped_treatments(self) -> list[str]: """Universe treatments absent from the latest plan's active list.""" plan = self.next_recommendation if plan is None: return [] return [ name for name in self._treatments if name not in plan.treatments ]
# ------------------------------------------------------------------ # Series-level summary + confidence # ------------------------------------------------------------------ @property def summary(self) -> str: """Deterministic multi-paragraph prose over the series so far.""" if not self._history: return ( "No rounds completed yet.\n\n" f"The experiment is configured over treatments " f"{self._treatments} and advances one round per " "next(exp) / advance() call." ) latest = self._history[-1] plan = latest.next_recommendation assert plan is not None # yielded experiments always carry a plan paragraphs = [ ( f"{len(self._history)} round(s) completed; confidence is " f"{self.confidence}. Latest {latest.summary_one_line()}" ), plan.prose_summary, ] candidates = self.graduation_candidates() if candidates: pairs = "; ".join( f"{candidate.treatment} in segment {candidate.segment.id}" for candidate in candidates ) paragraphs.append(f"Graduation candidates: {pairs}.") dropped = self.dropped_treatments() if dropped: paragraphs.append( f"Dropped from the active list: {', '.join(dropped)}." ) return "\n\n".join(paragraphs) @property def confidence(self) -> Literal["high", "medium", "low"]: """One-word label from the credibility + graduation state.""" credible = self.has_credible_segments(0.80) if credible and self.has_graduation_candidate(): return "high" if credible: return "medium" return "low"
[docs] def sequential_experiment( *, generator: Generator, schedule: Schedule, treatments: Sequence[str], cells: list[Cell] | None = None, min_control_weight: float = 0.05, min_explore_weight: float = 0.05, max_segment_depth: int = 3, min_segment_share: float = 0.10, calibration: Calibration | None = None, graduation_rule: GraduationRule | None = None, seed: int = 0, progress: bool = False, ) -> SequentialExperiment: """Construct a :class:`SequentialExperiment` — the L1 entry point. The verb form of the :class:`SequentialExperiment` constructor with the identical signature; see the class docstring for the parameter contract. Iterate the returned instance (``next(exp)``) to advance one round at a time, or call ``run_to_completion()`` on a bounded schedule. Scope: this surface assumes designed experiments — the operator emits the assignment rules (the plan's cells and policies), the platform assigns per those rules, and per-visitor propensities are known and recorded. Heavily-confounded observational inference is out of scope for this API; for that setting, consider econml or DoubleML. """ return SequentialExperiment( generator=generator, schedule=schedule, treatments=treatments, cells=cells, min_control_weight=min_control_weight, min_explore_weight=min_explore_weight, max_segment_depth=max_segment_depth, min_segment_share=min_segment_share, calibration=calibration, graduation_rule=graduation_rule, seed=seed, progress=progress, )
# --------------------------------------------------------------------------- # Cumulative assembly helpers # --------------------------------------------------------------------------- def _build_cumulative( rounds: list[ObservedExperimentData], ) -> tuple[ObservedExperimentData, list[np.ndarray]]: """Union the rounds' data into one cumulative ``ObservedExperimentData``. Variants are the union of the rounds' variant names in first-seen order (round 0's order first); each cumulative variant concatenates that variant's per-round frames in round order, so the cumulative row order is VARIANT-MAJOR: all rounds of variant 0, then all rounds of variant 1, and so on. Summary fields are summed across rounds. Returns: ``(cumulative, positions)`` where ``positions[r][i]`` is the cumulative row index of round *r*'s local concatenation row *i* (the round's own variant-major order) — the map for re-indexing round-aligned artifacts (truth arrays, the ``cell`` column) into cumulative order. """ order: list[str] = [] per_variant: dict[str, list[tuple[int, VariantData]]] = {} for r, data in enumerate(rounds): for variant in data.variants: if variant.name not in per_variant: order.append(variant.name) per_variant[variant.name] = [] per_variant[variant.name].append((r, variant)) cum_start: dict[tuple[int, str], int] = {} offset = 0 for name in order: for r, variant in per_variant[name]: cum_start[(r, name)] = offset offset += variant.n_visitors positions = [ np.concatenate( [ np.arange( cum_start[(r, v.name)], cum_start[(r, v.name)] + v.n_visitors, ) for v in data.variants ] ) for r, data in enumerate(rounds) ] variants = [ VariantData( name=name, visitors=pd.concat( [v.visitors for _, v in per_variant[name]], ignore_index=True, ), n_visitors=sum(v.n_visitors for _, v in per_variant[name]), n_conversions=sum(v.n_conversions for _, v in per_variant[name]), total_revenue=float( sum(v.total_revenue for _, v in per_variant[name]) ), ) for name in order ] cumulative = ObservedExperimentData( experiment_id=rounds[0].experiment_id, metric=rounds[0].metric, variants=variants, ) return cumulative, positions def _cumulative_truth( truths: list[CalibrationTruth], positions: list[np.ndarray], *, n_total: int, ) -> CalibrationTruth: """Merge per-round truths into one cumulative ``CalibrationTruth``. Per-visitor arrays (round-aligned to each round's own variant-major concatenation) are re-indexed into the cumulative row order via *positions*. Scalars: ``metric_id`` / ``metric_family`` must match across rounds; ``effect`` and each ``effect_components`` entry are row-count-weighted means. Raises: ValueError: On metric identity mismatch, effect-component key mismatch, or a per-visitor field populated in some rounds but not others. """ first = truths[0] for t in truths[1:]: if t.metric_id != first.metric_id: raise ValueError( f"truth metric_id changed between rounds: " f"{first.metric_id!r} vs {t.metric_id!r}" ) if t.metric_family != first.metric_family: raise ValueError( f"truth metric_family changed between rounds: " f"{first.metric_family!r} vs {t.metric_family!r}" ) if set(t.effect_components) != set(first.effect_components): raise ValueError( f"truth effect_components keys changed between rounds: " f"{sorted(first.effect_components)} vs " f"{sorted(t.effect_components)}" ) weights = np.array([len(p) for p in positions], dtype=np.float64) total_weight = float(weights.sum()) effect = float( sum(w * t.effect for w, t in zip(weights, truths, strict=True)) / total_weight ) effect_components = { key: float( sum( w * t.effect_components[key] for w, t in zip(weights, truths, strict=True) ) / total_weight ) for key in first.effect_components } def stitch(arrays: list[AlignedVisitorArray]) -> AlignedVisitorArray: out = np.empty(n_total, dtype=np.float64) for pos, arr in zip(positions, arrays, strict=True): out[pos] = np.asarray(arr.values, dtype=np.float64) return AlignedVisitorArray(values=out, n_visitors=n_total) def realign(field: str) -> AlignedVisitorArray | None: arrays = [getattr(t, field) for t in truths] if all(a is None for a in arrays): return None if any(a is None for a in arrays): raise ValueError( f"truth field {field!r} is populated in some rounds but " "not others" ) return stitch(arrays) def realign_list(field: str) -> list[AlignedVisitorArray] | None: lists = [getattr(t, field) for t in truths] if all(entry is None for entry in lists): return None if any(entry is None for entry in lists): raise ValueError( f"truth field {field!r} is populated in some rounds but " "not others" ) lengths = {len(entry) for entry in lists} if len(lengths) != 1: raise ValueError( f"truth field {field!r} length changed between rounds: " f"{sorted(lengths)}" ) return [ stitch([entry[i] for entry in lists]) for i in range(next(iter(lengths))) ] return CalibrationTruth( effect=effect, metric_id=first.metric_id, metric_family=first.metric_family, effect_components=effect_components, cate_per_visitor=realign("cate_per_visitor"), conv_cate_per_visitor=realign("conv_cate_per_visitor"), aov_cate_per_visitor=realign("aov_cate_per_visitor"), p0_per_visitor=realign("p0_per_visitor"), p1_per_visitor=realign("p1_per_visitor"), m0_per_visitor=realign("m0_per_visitor"), m1_per_visitor=realign("m1_per_visitor"), contrast_cate_per_visitor=realign_list("contrast_cate_per_visitor"), p_per_visitor=realign_list("p_per_visitor"), m_per_visitor=realign_list("m_per_visitor"), )