Source code for pytyche.experiment.recommendation

"""Recommendation engine: the per-round ``NextRoundPlan`` construction.

The engine is a thin composition over the shipped L2 primitives: the
policy tree (already carrying the ε-clipped Thompson ``allocation_map``)
comes from ``posterior.fit_policy_tree``, and graduation evidence from
``posterior.recommendation_summary`` — nothing here reimplements the
allocation or decision rules (design.md §"Recommendation engine").

Prose templates: every operator-facing phrase the engine emits lives in
the "Prose templates" section below — deterministic, no LLM, no
timestamps.
"""

from __future__ import annotations

import dataclasses
import html
from collections.abc import Sequence
from typing import TYPE_CHECKING, Protocol, runtime_checkable

import numpy as np

from pytyche.bcf.config import compute_num_trees_tau
from pytyche.experiment.cells import (
    BaselinePolicy,
    Cell,
    TreePolicy,
    UniformPolicy,
    validate_cell_weights,
)
from pytyche.experiment.experiment import Experiment

if TYPE_CHECKING:
    from pytyche.analysis import PolicyTreeResult
    from pytyche.bcf.config import HurdleBCFResult
    from pytyche.contracts import (
        AnalysisResult,
        DiscoveredSegment,
        RecommendationSummary,
    )
    from pytyche.experiment.schedule import Schedule

__all__ = [
    "ExpectedLossRule",
    "GraduationCandidate",
    "GraduationRule",
    "NextRoundPlan",
    "cold_start_plan",
    "detect_dropped_treatments",
    "next_fit_num_trees_tau",
    "recommend",
]

#: A segment is credible when its bootstrap stability clears this score —
#: the same 0.80 cutoff as ``posterior.has_credible_segments()``.
_CREDIBLE_STABILITY = 0.80


[docs] @runtime_checkable class GraduationRule(Protocol): """Decision rule for whether a (treatment, segment) pair is a graduation candidate. ``history`` is oldest-first and INCLUDES the round under evaluation as its LAST element — the engine appends a provisional :class:`~pytyche.experiment.experiment.Experiment` for the current round before consulting the rule. Default implementation: :class:`ExpectedLossRule`. """ def is_candidate( self, treatment: str, segment: DiscoveredSegment, history: list[Experiment], ) -> bool: ...
[docs] @dataclasses.dataclass(frozen=True) class ExpectedLossRule: """Default graduation rule: sustained compound-threshold evidence. A (treatment, segment) pair is a candidate when its per-round decision evidence cleared all three thresholds — ``expected_loss_comparison < expected_loss_max`` AND ``probability_positive > p_positive_threshold`` AND ``probability_better > p_better_threshold`` — in the LAST ``sustained_rounds`` consecutive rounds of the history (this round and the ``sustained_rounds - 1`` before it). The evidence is recomputed per round from each history entry's stored posterior via ``experiment.posterior.recommendation_summary(treatment, segment=segment)`` — array math on stored draws, no extra state. Graduation runs on RAW posterior probabilities in v0.2: the calibration artifact's correction scope is intervals-only, so the probability and expected-loss inputs here come from raw posterior draws even when a calibration artifact is attached. When the calibration track's p-curve extension lands, graduation becomes calibrated with no API change here. The defaults are the canonical thresholds; calibrate to your domain. """ expected_loss_max: float = 0.005 p_positive_threshold: float = 0.95 p_better_threshold: float = 0.80 sustained_rounds: int = 2
[docs] def is_candidate( self, treatment: str, segment: DiscoveredSegment, history: list[Experiment], ) -> bool: """Whether the thresholds held in the last ``sustained_rounds``.""" return ( self.consecutive_held(treatment, segment, history) >= self.sustained_rounds )
[docs] def consecutive_held( self, treatment: str, segment: DiscoveredSegment, history: list[Experiment], ) -> int: """Count of trailing rounds whose evidence cleared the thresholds. The engine records this on :class:`GraduationCandidate` — a pair holding five consecutive rounds reports 5, not the rule's configured minimum. """ held = 0 for experiment in reversed(history): if not self._passes(experiment, treatment, segment): break held += 1 return held
def _passes( self, experiment: Experiment, treatment: str, segment: DiscoveredSegment, ) -> bool: summary = experiment.posterior.recommendation_summary( treatment, segment=segment ) return ( summary.expected_loss_comparison < self.expected_loss_max and summary.probability_positive > self.p_positive_threshold and summary.probability_better > self.p_better_threshold )
[docs] @dataclasses.dataclass(frozen=True) class GraduationCandidate: """A (treatment, segment) pair that has met the graduation rule. Surfaced as structured data only — the library never auto-graduates; the operator (or an automated workflow) decides whether to ship. Fields: treatment: The treatment name. segment: The discovered segment where graduation fired. sustained_rounds: Count of consecutive rounds the rule has held. latest_recommendation: The current round's per-(treatment, segment) decision evidence. """ treatment: str segment: DiscoveredSegment sustained_rounds: int latest_recommendation: RecommendationSummary def __repr__(self) -> str: evidence = self.latest_recommendation return ( f"GraduationCandidate — {self.treatment!r} @ " f"{self.segment.rule.description}, " f"sustained {self.sustained_rounds} round(s)\n" f" expected loss if shipped: " f"{evidence.expected_loss_comparison:.4f}/visitor" f" P(lift > 0) = {evidence.probability_positive:.2f}\n" f" value of one more round: " f"{evidence.expected_value_of_one_more_round:.4f}/visitor" )
[docs] @dataclasses.dataclass(frozen=True) class NextRoundPlan: """The recommendation engine's proposal for the next round. The engine emits ONE proposed cell structure per round; the operator may accept, partially override, or fully replace before shipping. Fields: n_visitors: ``schedule.next_round_size(rounds_completed)``; ``None`` when the schedule is exhausted (the final round's plan has no next round to size). cells: The recommended cell structure (weights sum to 1.0). treatments: Treatments active going into the next round. dropped_treatments: Treatments dropped between this round and the next; disjoint from ``treatments``. graduation_candidates: (treatment, segment) pairs that met the graduation rule. Candidates' treatments stay active — no auto-graduation. prose_summary: Multi-paragraph PM-readable rationale (template-formatted, deterministic). tree: The round's policy-tree fit, from which the Optimized cell was built — segments, stability scores, allocation map. ``None`` only on the round-0 cold-start plan, where no fit exists yet. Consumers needing the round's tree (the truth comparison, next-fit ``num_trees_tau`` sizing) read it here rather than refitting. """ n_visitors: int | None cells: list[Cell] treatments: list[str] dropped_treatments: list[str] graduation_candidates: list[GraduationCandidate] prose_summary: str tree: PolicyTreeResult | None = None def __post_init__(self) -> None: validate_cell_weights(self.cells) overlap = sorted(set(self.dropped_treatments) & set(self.treatments)) if overlap: raise ValueError( f"dropped_treatments must be disjoint from treatments; " f"{overlap} appear in both" ) def _size_text(self) -> str: if self.n_visitors is None: return "no next round (schedule exhausted)" return f"{self.n_visitors:,} visitors" def _footer_lines(self) -> list[str]: lines = [] if self.dropped_treatments: lines.append(f"dropped: {self.dropped_treatments}") if self.graduation_candidates: lines.append( f"graduation candidates: {len(self.graduation_candidates)}" ) if self.tree is None: lines.append("cold start — no fitted tree yet") return lines def __repr__(self) -> str: lines = [ f"NextRoundPlan — {self._size_text()}, {len(self.cells)} cell(s)" ] lines.extend( f" {cell.id:<12} weight {cell.weight:.2f} " f"{cell.policy.describe()}" for cell in self.cells ) lines.extend(f" {line}" for line in self._footer_lines()) return "\n".join(lines) def _repr_html_(self) -> str: import pandas as pd header = html.escape( f"NextRoundPlan — {self._size_text()}, {len(self.cells)} cell(s)" ) cells = pd.DataFrame( [ { "cell": cell.id, "weight": f"{cell.weight:.2f}", "policy": cell.policy.describe(), } for cell in self.cells ] ) parts = [ f"<div><b>{header}</b>", cells.to_html(index=False, border=0), ] parts.extend( f"<p>{html.escape(line)}</p>" for line in self._footer_lines() ) parts.append("</div>") return "".join(parts)
[docs] def cold_start_plan( treatments: list[str], n_visitors: int | None ) -> NextRoundPlan: """The round-0 default plan: Control + Explore at 50/50. No posterior exists yet to derive a tree from, so there is no Optimized cell — the Control cell measures the baseline cleanly and the Explore cell gives the first fit uniform-random signal. Args: treatments: The experiment's full treatments universe (control first). n_visitors: The first round's size per the schedule; ``None`` when the schedule offers no rounds. Returns: The default :class:`NextRoundPlan` for round 0. """ cells = [ Cell(id="control", policy=BaselinePolicy(), weight=0.5), Cell(id="explore", policy=UniformPolicy(over=treatments), weight=0.5), ] return NextRoundPlan( n_visitors=n_visitors, cells=cells, treatments=list(treatments), dropped_treatments=[], graduation_candidates=[], prose_summary=_COLD_START_PROSE, )
[docs] def next_fit_num_trees_tau( posterior: HurdleBCFResult, tree_result: PolicyTreeResult, cumulative_n: int, ) -> int: """The next round's ``num_trees_tau`` from this round's evidence. ``compute_num_trees_tau`` at its defaults, with ``d_tau`` the unique split-feature count of the fitted policy tree (min 1 — a root-only tree still has one effective dimension) and ``sigma_tau`` the standard deviation of the posterior-mean CATEs. Args: posterior: This round's hurdle posterior. tree_result: This round's policy-tree fit. cumulative_n: Total visitors across all rounds including the next one being sized. Returns: Tree count for the next fit's ``GPUBCFConfig.num_trees_tau``. """ split_features = np.asarray(tree_result.tree.tree_.feature) d_tau = len(set(split_features[split_features >= 0].tolist())) or 1 cate_mean = np.asarray(posterior.rpv_cate_samples).mean(axis=1) sigma_tau = float(np.std(cate_mean)) return compute_num_trees_tau(cumulative_n, d_tau=d_tau, sigma_tau=sigma_tau)
[docs] def detect_dropped_treatments( allocation_history: Sequence[dict[int, dict[str, float]]], treatments: list[str], *, epsilon: float = 0.02, sustained_rounds: int = 2, ) -> list[str]: """Treatments whose allocation starved below *epsilon* everywhere. A treatment is dropped when its allocation is below *epsilon* in EVERY segment for the last *sustained_rounds* consecutive rounds. A name absent from a segment's weights counts as zero allocation. Args: allocation_history: Per-round allocation maps (``{leaf_id: {treatment_name: weight}}``), oldest first. treatments: Names to test; the result preserves this order. epsilon: The starvation threshold (the Thompson clip floor). sustained_rounds: Consecutive starved rounds required. Returns: Dropped names in *treatments* order; empty when the history is shorter than *sustained_rounds*. """ if len(allocation_history) < sustained_rounds: return [] recent = list(allocation_history)[-sustained_rounds:] return [ name for name in treatments if all( all( weights.get(name, 0.0) < epsilon for weights in round_map.values() ) for round_map in recent ) ]
[docs] def recommend( analysis: AnalysisResult, *, history: list[Experiment], treatments: list[str], schedule: Schedule, min_control_weight: float = 0.05, min_explore_weight: float = 0.05, max_segment_depth: int = 3, min_segment_share: float = 0.10, graduation_rule: GraduationRule | None = None, seed: int = 0, ) -> NextRoundPlan: """Build the next round's :class:`NextRoundPlan` from this round's analysis. Composes the shipped L2 primitives: the policy tree and its ε-clipped Thompson allocation come from ``analysis.posterior.fit_policy_tree(...)``, graduation evidence from ``posterior.recommendation_summary(...)``. The proposed cells are the Control + Explore + Optimized triple at the floor weights, the graduation rule is consulted per (non-control treatment × segment) pair over ``[*history, <this round>]``, and treatments starved below the allocation floor for consecutive rounds are dropped from the active list. Args: analysis: This round's analysis; the posterior rides on ``analysis.posterior``. history: Prior rounds' experiments, oldest first (this round is NOT included; the engine appends its own provisional view). treatments: The experiment's full treatments universe (control first). schedule: Sizes the next round via ``next_round_size(len(history) + 1)``. min_control_weight: Guaranteed Control-cell share. min_explore_weight: Guaranteed Explore-cell share. max_segment_depth: Policy-tree depth bound. min_segment_share: Minimum per-leaf population share. graduation_rule: ``None`` uses :class:`ExpectedLossRule` at its defaults. seed: Bootstrap seed for the policy tree's stability scores. Returns: The assembled :class:`NextRoundPlan`. Raises: ValueError: When ``min_control_weight + min_explore_weight >= 1.0``, or when the posterior carries no observed data. """ if min_control_weight + min_explore_weight >= 1.0: raise ValueError( f"min_control_weight + min_explore_weight must be < 1.0 to leave " f"room for the Optimized cell; got {min_control_weight} + " f"{min_explore_weight} = {min_control_weight + min_explore_weight}" ) posterior = analysis.posterior observed = posterior.observed if observed is None: raise ValueError( "analysis.posterior carries no observed data (observed is None); " "the engine needs visitor rows to segment and allocate over" ) tree_result = posterior.fit_policy_tree( max_depth=max_segment_depth, min_segment_share=min_segment_share, bootstrap_seed=seed, ) allocation_history = [ allocation for allocation in ( _optimized_allocation_map(prior) for prior in history ) if allocation is not None ] allocation_history.append(tree_result.allocation_map) # Control is the permanent lift reference — never droppable, however # starved its Thompson allocation gets. droppable = [ name for name in treatments if name != observed.control_name ] dropped = detect_dropped_treatments(allocation_history, droppable) active = [name for name in treatments if name not in dropped] cells = [ Cell(id="control", policy=BaselinePolicy(), weight=min_control_weight), Cell( id="explore", policy=UniformPolicy(over=active, treatments=treatments), weight=min_explore_weight, ), Cell( id="optimized", policy=TreePolicy( tree_result.tree, tree_result.allocation_map, treatments=treatments, ), weight=1.0 - (min_control_weight + min_explore_weight), ), ] n_visitors = schedule.next_round_size(len(history) + 1) provisional = Experiment( round_idx=len(history), posterior=posterior, analysis=analysis, cells_shipped=[], cell_observations=[], next_recommendation=None, truth_comparison=None, ) history_view = [*history, provisional] rule: GraduationRule = ( graduation_rule if graduation_rule is not None else ExpectedLossRule() ) candidates = [ GraduationCandidate( treatment=name, segment=segment, sustained_rounds=_held_rounds(rule, name, segment, history_view), latest_recommendation=posterior.recommendation_summary( name, segment=segment ), ) for name in active if name != observed.control_name for segment in tree_result.segments if rule.is_candidate(name, segment, history_view) ] has_credible = any( score >= _CREDIBLE_STABILITY for score in tree_result.stability_scores.values() ) if has_credible and candidates: confidence = "high" elif has_credible: confidence = "medium" else: confidence = "low" prose = _compose_prose( confidence=confidence, decision=analysis.recommendation.decision.name, n_visitors=n_visitors, cells=cells, n_segments=len(tree_result.segments), candidates=candidates, dropped=dropped, ) return NextRoundPlan( n_visitors=n_visitors, cells=cells, treatments=active, dropped_treatments=dropped, graduation_candidates=candidates, prose_summary=prose, tree=tree_result, )
@runtime_checkable class _SupportsConsecutiveHeld(Protocol): """Optional rule capability: report the evidence-held count directly.""" def consecutive_held( self, treatment: str, segment: DiscoveredSegment, history: list[Experiment], ) -> int: ... def _held_rounds( rule: GraduationRule, treatment: str, segment: DiscoveredSegment, history_view: list[Experiment], ) -> int: """The candidate's actual consecutive-held count. Rules exposing ``consecutive_held`` (the default :class:`ExpectedLossRule` does) report their own evidence count. For custom rules the protocol only guarantees a fired/not-fired answer, so the count falls back to the trailing rounds at whose end the rule fired (>= 1, since it fired this round). """ if isinstance(rule, _SupportsConsecutiveHeld): return int(rule.consecutive_held(treatment, segment, history_view)) fired = 0 for end in range(len(history_view), 0, -1): if not rule.is_candidate(treatment, segment, history_view[:end]): break fired += 1 return fired def _optimized_allocation_map( experiment: Experiment, ) -> dict[int, dict[str, float]] | None: """A prior round's Optimized-cell allocation map, where present.""" plan = experiment.next_recommendation if plan is None: return None for cell in plan.cells: if cell.id.startswith("optimized") and isinstance( cell.policy, TreePolicy ): return cell.policy.allocation_map return None # --------------------------------------------------------------------------- # Prose templates — the single home of every operator-facing phrase the # engine emits. Deterministic: no LLM, no randomness, no timestamps. # --------------------------------------------------------------------------- _COLD_START_PROSE = ( "Round 1 starts from a cold start: no posterior exists yet, so there is " "no optimized policy to route traffic through.\n\n" "The plan splits traffic evenly between a Control cell (clean baseline " "measurement) and an Explore cell (uniform-random over the treatments) " "to give the first fit clean signal for effect discovery." ) _CONFIDENCE_OPENERS: dict[str, str] = { "high": ( "We have strong, sustained evidence: at least one discovered segment " "is stable across bootstrap refits, and a treatment has met the " "graduation thresholds in consecutive rounds." ), "medium": ( "Segment structure is firming up: at least one discovered segment is " "stable across bootstrap refits, but no treatment has sustained the " "graduation thresholds yet." ), "low": ( "Early signal only: no discovered segment is stable enough to act on " "yet, so the plan keeps exploring while the optimized policy " "concentrates traffic where the model currently expects value." ), } _DECISION_FRAGMENTS: dict[str, str] = { "SHIP": ( "This round's summary decision is SHIP — the leading treatment " "clears the decision thresholds on the current evidence." ), "CONTINUE": ( "This round's summary decision is CONTINUE — keep collecting data " "before committing." ), "STOP": ( "This round's summary decision is STOP — the evidence points to a " "harmful or futile treatment." ), } _DECISION_FALLBACK = "This round's summary decision is {name}." def _compose_prose( *, confidence: str, decision: str, n_visitors: int | None, cells: list[Cell], n_segments: int, candidates: list[GraduationCandidate], dropped: list[str], ) -> str: """Assemble the plan's multi-paragraph prose from the templates.""" split = ", ".join(f"{cell.id} {cell.weight:.0%}" for cell in cells) sizing = ( f"{n_visitors:,} visitors" if n_visitors is not None else "no further rounds (the schedule is exhausted)" ) decision_fragment = _DECISION_FRAGMENTS.get( decision, _DECISION_FALLBACK.format(name=decision) ) paragraphs = [ f"{_CONFIDENCE_OPENERS[confidence]} {decision_fragment}", ( f"Next round: {sizing}, split {split}. The optimized cell routes " f"through a policy tree over {n_segments} discovered segment(s) " f"with per-segment Thompson allocation." ), ] if candidates: pairs = "; ".join( f"{candidate.treatment} in segment {candidate.segment.id} " f"({candidate.segment.rule.description})" for candidate in candidates ) paragraphs.append( f"Graduation candidates (surfaced for the operator — nothing is " f"auto-graduated): {pairs}." ) if dropped: paragraphs.append( f"Dropped from the active list (allocation starved below the " f"floor in every segment for consecutive rounds): " f"{', '.join(dropped)}." ) return "\n\n".join(paragraphs)