"""Recommendation engine: the per-round ``NextRoundPlan`` construction.
The engine is a thin composition over the shipped L2 primitives: the
policy tree (already carrying the ε-clipped Thompson ``allocation_map``)
comes from ``posterior.fit_policy_tree``, and graduation evidence from
``posterior.recommendation_summary`` — nothing here reimplements the
allocation or decision rules (design.md §"Recommendation engine").
Prose templates: every operator-facing phrase the engine emits lives in
the "Prose templates" section below — deterministic, no LLM, no
timestamps.
"""
from __future__ import annotations
import dataclasses
import html
from collections.abc import Sequence
from typing import TYPE_CHECKING, Protocol, runtime_checkable
import numpy as np
from pytyche.bcf.config import compute_num_trees_tau
from pytyche.experiment.cells import (
BaselinePolicy,
Cell,
TreePolicy,
UniformPolicy,
validate_cell_weights,
)
from pytyche.experiment.experiment import Experiment
if TYPE_CHECKING:
from pytyche.analysis import PolicyTreeResult
from pytyche.bcf.config import HurdleBCFResult
from pytyche.contracts import (
AnalysisResult,
DiscoveredSegment,
RecommendationSummary,
)
from pytyche.experiment.schedule import Schedule
__all__ = [
"ExpectedLossRule",
"GraduationCandidate",
"GraduationRule",
"NextRoundPlan",
"cold_start_plan",
"detect_dropped_treatments",
"next_fit_num_trees_tau",
"recommend",
]
#: A segment is credible when its bootstrap stability clears this score —
#: the same 0.80 cutoff as ``posterior.has_credible_segments()``.
_CREDIBLE_STABILITY = 0.80
[docs]
@runtime_checkable
class GraduationRule(Protocol):
"""Decision rule for whether a (treatment, segment) pair is a
graduation candidate.
``history`` is oldest-first and INCLUDES the round under evaluation
as its LAST element — the engine appends a provisional
:class:`~pytyche.experiment.experiment.Experiment` for the current
round before consulting the rule. Default implementation:
:class:`ExpectedLossRule`.
"""
def is_candidate(
self,
treatment: str,
segment: DiscoveredSegment,
history: list[Experiment],
) -> bool: ...
[docs]
@dataclasses.dataclass(frozen=True)
class ExpectedLossRule:
"""Default graduation rule: sustained compound-threshold evidence.
A (treatment, segment) pair is a candidate when its per-round
decision evidence cleared all three thresholds —
``expected_loss_comparison < expected_loss_max`` AND
``probability_positive > p_positive_threshold`` AND
``probability_better > p_better_threshold`` — in the LAST
``sustained_rounds`` consecutive rounds of the history (this round
and the ``sustained_rounds - 1`` before it). The evidence is
recomputed per round from each history entry's stored posterior via
``experiment.posterior.recommendation_summary(treatment,
segment=segment)`` — array math on stored draws, no extra state.
Graduation runs on RAW posterior probabilities in v0.2: the
calibration artifact's correction scope is intervals-only, so the
probability and expected-loss inputs here come from raw posterior
draws even when a calibration artifact is attached. When the
calibration track's p-curve extension lands, graduation becomes
calibrated with no API change here.
The defaults are the canonical thresholds; calibrate to your domain.
"""
expected_loss_max: float = 0.005
p_positive_threshold: float = 0.95
p_better_threshold: float = 0.80
sustained_rounds: int = 2
[docs]
def is_candidate(
self,
treatment: str,
segment: DiscoveredSegment,
history: list[Experiment],
) -> bool:
"""Whether the thresholds held in the last ``sustained_rounds``."""
return (
self.consecutive_held(treatment, segment, history)
>= self.sustained_rounds
)
[docs]
def consecutive_held(
self,
treatment: str,
segment: DiscoveredSegment,
history: list[Experiment],
) -> int:
"""Count of trailing rounds whose evidence cleared the thresholds.
The engine records this on :class:`GraduationCandidate` — a pair
holding five consecutive rounds reports 5, not the rule's
configured minimum.
"""
held = 0
for experiment in reversed(history):
if not self._passes(experiment, treatment, segment):
break
held += 1
return held
def _passes(
self,
experiment: Experiment,
treatment: str,
segment: DiscoveredSegment,
) -> bool:
summary = experiment.posterior.recommendation_summary(
treatment, segment=segment
)
return (
summary.expected_loss_comparison < self.expected_loss_max
and summary.probability_positive > self.p_positive_threshold
and summary.probability_better > self.p_better_threshold
)
[docs]
@dataclasses.dataclass(frozen=True)
class GraduationCandidate:
"""A (treatment, segment) pair that has met the graduation rule.
Surfaced as structured data only — the library never auto-graduates;
the operator (or an automated workflow) decides whether to ship.
Fields:
treatment: The treatment name.
segment: The discovered segment where graduation fired.
sustained_rounds: Count of consecutive rounds the rule has held.
latest_recommendation: The current round's per-(treatment,
segment) decision evidence.
"""
treatment: str
segment: DiscoveredSegment
sustained_rounds: int
latest_recommendation: RecommendationSummary
def __repr__(self) -> str:
evidence = self.latest_recommendation
return (
f"GraduationCandidate — {self.treatment!r} @ "
f"{self.segment.rule.description}, "
f"sustained {self.sustained_rounds} round(s)\n"
f" expected loss if shipped: "
f"{evidence.expected_loss_comparison:.4f}/visitor"
f" P(lift > 0) = {evidence.probability_positive:.2f}\n"
f" value of one more round: "
f"{evidence.expected_value_of_one_more_round:.4f}/visitor"
)
[docs]
@dataclasses.dataclass(frozen=True)
class NextRoundPlan:
"""The recommendation engine's proposal for the next round.
The engine emits ONE proposed cell structure per round; the operator
may accept, partially override, or fully replace before shipping.
Fields:
n_visitors: ``schedule.next_round_size(rounds_completed)``;
``None`` when the schedule is exhausted (the final round's
plan has no next round to size).
cells: The recommended cell structure (weights sum to 1.0).
treatments: Treatments active going into the next round.
dropped_treatments: Treatments dropped between this round and the
next; disjoint from ``treatments``.
graduation_candidates: (treatment, segment) pairs that met the
graduation rule. Candidates' treatments stay active — no
auto-graduation.
prose_summary: Multi-paragraph PM-readable rationale
(template-formatted, deterministic).
tree: The round's policy-tree fit, from which the Optimized
cell was built — segments, stability scores, allocation
map. ``None`` only on the round-0 cold-start plan, where no
fit exists yet. Consumers needing the round's tree (the
truth comparison, next-fit ``num_trees_tau`` sizing) read
it here rather than refitting.
"""
n_visitors: int | None
cells: list[Cell]
treatments: list[str]
dropped_treatments: list[str]
graduation_candidates: list[GraduationCandidate]
prose_summary: str
tree: PolicyTreeResult | None = None
def __post_init__(self) -> None:
validate_cell_weights(self.cells)
overlap = sorted(set(self.dropped_treatments) & set(self.treatments))
if overlap:
raise ValueError(
f"dropped_treatments must be disjoint from treatments; "
f"{overlap} appear in both"
)
def _size_text(self) -> str:
if self.n_visitors is None:
return "no next round (schedule exhausted)"
return f"{self.n_visitors:,} visitors"
def _footer_lines(self) -> list[str]:
lines = []
if self.dropped_treatments:
lines.append(f"dropped: {self.dropped_treatments}")
if self.graduation_candidates:
lines.append(
f"graduation candidates: {len(self.graduation_candidates)}"
)
if self.tree is None:
lines.append("cold start — no fitted tree yet")
return lines
def __repr__(self) -> str:
lines = [
f"NextRoundPlan — {self._size_text()}, {len(self.cells)} cell(s)"
]
lines.extend(
f" {cell.id:<12} weight {cell.weight:.2f} "
f"{cell.policy.describe()}"
for cell in self.cells
)
lines.extend(f" {line}" for line in self._footer_lines())
return "\n".join(lines)
def _repr_html_(self) -> str:
import pandas as pd
header = html.escape(
f"NextRoundPlan — {self._size_text()}, {len(self.cells)} cell(s)"
)
cells = pd.DataFrame(
[
{
"cell": cell.id,
"weight": f"{cell.weight:.2f}",
"policy": cell.policy.describe(),
}
for cell in self.cells
]
)
parts = [
f"<div><b>{header}</b>",
cells.to_html(index=False, border=0),
]
parts.extend(
f"<p>{html.escape(line)}</p>" for line in self._footer_lines()
)
parts.append("</div>")
return "".join(parts)
[docs]
def cold_start_plan(
treatments: list[str], n_visitors: int | None
) -> NextRoundPlan:
"""The round-0 default plan: Control + Explore at 50/50.
No posterior exists yet to derive a tree from, so there is no
Optimized cell — the Control cell measures the baseline cleanly and
the Explore cell gives the first fit uniform-random signal.
Args:
treatments: The experiment's full treatments universe (control
first).
n_visitors: The first round's size per the schedule; ``None``
when the schedule offers no rounds.
Returns:
The default :class:`NextRoundPlan` for round 0.
"""
cells = [
Cell(id="control", policy=BaselinePolicy(), weight=0.5),
Cell(id="explore", policy=UniformPolicy(over=treatments), weight=0.5),
]
return NextRoundPlan(
n_visitors=n_visitors,
cells=cells,
treatments=list(treatments),
dropped_treatments=[],
graduation_candidates=[],
prose_summary=_COLD_START_PROSE,
)
[docs]
def next_fit_num_trees_tau(
posterior: HurdleBCFResult,
tree_result: PolicyTreeResult,
cumulative_n: int,
) -> int:
"""The next round's ``num_trees_tau`` from this round's evidence.
``compute_num_trees_tau`` at its defaults, with ``d_tau`` the unique
split-feature count of the fitted policy tree (min 1 — a root-only
tree still has one effective dimension) and ``sigma_tau`` the
standard deviation of the posterior-mean CATEs.
Args:
posterior: This round's hurdle posterior.
tree_result: This round's policy-tree fit.
cumulative_n: Total visitors across all rounds including the
next one being sized.
Returns:
Tree count for the next fit's ``GPUBCFConfig.num_trees_tau``.
"""
split_features = np.asarray(tree_result.tree.tree_.feature)
d_tau = len(set(split_features[split_features >= 0].tolist())) or 1
cate_mean = np.asarray(posterior.rpv_cate_samples).mean(axis=1)
sigma_tau = float(np.std(cate_mean))
return compute_num_trees_tau(cumulative_n, d_tau=d_tau, sigma_tau=sigma_tau)
[docs]
def detect_dropped_treatments(
allocation_history: Sequence[dict[int, dict[str, float]]],
treatments: list[str],
*,
epsilon: float = 0.02,
sustained_rounds: int = 2,
) -> list[str]:
"""Treatments whose allocation starved below *epsilon* everywhere.
A treatment is dropped when its allocation is below *epsilon* in
EVERY segment for the last *sustained_rounds* consecutive rounds.
A name absent from a segment's weights counts as zero allocation.
Args:
allocation_history: Per-round allocation maps
(``{leaf_id: {treatment_name: weight}}``), oldest first.
treatments: Names to test; the result preserves this order.
epsilon: The starvation threshold (the Thompson clip floor).
sustained_rounds: Consecutive starved rounds required.
Returns:
Dropped names in *treatments* order; empty when the history is
shorter than *sustained_rounds*.
"""
if len(allocation_history) < sustained_rounds:
return []
recent = list(allocation_history)[-sustained_rounds:]
return [
name
for name in treatments
if all(
all(
weights.get(name, 0.0) < epsilon
for weights in round_map.values()
)
for round_map in recent
)
]
[docs]
def recommend(
analysis: AnalysisResult,
*,
history: list[Experiment],
treatments: list[str],
schedule: Schedule,
min_control_weight: float = 0.05,
min_explore_weight: float = 0.05,
max_segment_depth: int = 3,
min_segment_share: float = 0.10,
graduation_rule: GraduationRule | None = None,
seed: int = 0,
) -> NextRoundPlan:
"""Build the next round's :class:`NextRoundPlan` from this round's analysis.
Composes the shipped L2 primitives: the policy tree and its
ε-clipped Thompson allocation come from
``analysis.posterior.fit_policy_tree(...)``, graduation evidence from
``posterior.recommendation_summary(...)``. The proposed cells are the
Control + Explore + Optimized triple at the floor weights, the
graduation rule is consulted per (non-control treatment × segment)
pair over ``[*history, <this round>]``, and treatments starved below
the allocation floor for consecutive rounds are dropped from the
active list.
Args:
analysis: This round's analysis; the posterior rides on
``analysis.posterior``.
history: Prior rounds' experiments, oldest first (this round is
NOT included; the engine appends its own provisional view).
treatments: The experiment's full treatments universe (control
first).
schedule: Sizes the next round via
``next_round_size(len(history) + 1)``.
min_control_weight: Guaranteed Control-cell share.
min_explore_weight: Guaranteed Explore-cell share.
max_segment_depth: Policy-tree depth bound.
min_segment_share: Minimum per-leaf population share.
graduation_rule: ``None`` uses :class:`ExpectedLossRule` at its
defaults.
seed: Bootstrap seed for the policy tree's stability scores.
Returns:
The assembled :class:`NextRoundPlan`.
Raises:
ValueError: When ``min_control_weight + min_explore_weight >=
1.0``, or when the posterior carries no observed data.
"""
if min_control_weight + min_explore_weight >= 1.0:
raise ValueError(
f"min_control_weight + min_explore_weight must be < 1.0 to leave "
f"room for the Optimized cell; got {min_control_weight} + "
f"{min_explore_weight} = {min_control_weight + min_explore_weight}"
)
posterior = analysis.posterior
observed = posterior.observed
if observed is None:
raise ValueError(
"analysis.posterior carries no observed data (observed is None); "
"the engine needs visitor rows to segment and allocate over"
)
tree_result = posterior.fit_policy_tree(
max_depth=max_segment_depth,
min_segment_share=min_segment_share,
bootstrap_seed=seed,
)
allocation_history = [
allocation
for allocation in (
_optimized_allocation_map(prior) for prior in history
)
if allocation is not None
]
allocation_history.append(tree_result.allocation_map)
# Control is the permanent lift reference — never droppable, however
# starved its Thompson allocation gets.
droppable = [
name for name in treatments if name != observed.control_name
]
dropped = detect_dropped_treatments(allocation_history, droppable)
active = [name for name in treatments if name not in dropped]
cells = [
Cell(id="control", policy=BaselinePolicy(), weight=min_control_weight),
Cell(
id="explore",
policy=UniformPolicy(over=active, treatments=treatments),
weight=min_explore_weight,
),
Cell(
id="optimized",
policy=TreePolicy(
tree_result.tree,
tree_result.allocation_map,
treatments=treatments,
),
weight=1.0 - (min_control_weight + min_explore_weight),
),
]
n_visitors = schedule.next_round_size(len(history) + 1)
provisional = Experiment(
round_idx=len(history),
posterior=posterior,
analysis=analysis,
cells_shipped=[],
cell_observations=[],
next_recommendation=None,
truth_comparison=None,
)
history_view = [*history, provisional]
rule: GraduationRule = (
graduation_rule if graduation_rule is not None else ExpectedLossRule()
)
candidates = [
GraduationCandidate(
treatment=name,
segment=segment,
sustained_rounds=_held_rounds(rule, name, segment, history_view),
latest_recommendation=posterior.recommendation_summary(
name, segment=segment
),
)
for name in active
if name != observed.control_name
for segment in tree_result.segments
if rule.is_candidate(name, segment, history_view)
]
has_credible = any(
score >= _CREDIBLE_STABILITY
for score in tree_result.stability_scores.values()
)
if has_credible and candidates:
confidence = "high"
elif has_credible:
confidence = "medium"
else:
confidence = "low"
prose = _compose_prose(
confidence=confidence,
decision=analysis.recommendation.decision.name,
n_visitors=n_visitors,
cells=cells,
n_segments=len(tree_result.segments),
candidates=candidates,
dropped=dropped,
)
return NextRoundPlan(
n_visitors=n_visitors,
cells=cells,
treatments=active,
dropped_treatments=dropped,
graduation_candidates=candidates,
prose_summary=prose,
tree=tree_result,
)
@runtime_checkable
class _SupportsConsecutiveHeld(Protocol):
"""Optional rule capability: report the evidence-held count directly."""
def consecutive_held(
self,
treatment: str,
segment: DiscoveredSegment,
history: list[Experiment],
) -> int: ...
def _held_rounds(
rule: GraduationRule,
treatment: str,
segment: DiscoveredSegment,
history_view: list[Experiment],
) -> int:
"""The candidate's actual consecutive-held count.
Rules exposing ``consecutive_held`` (the default
:class:`ExpectedLossRule` does) report their own evidence count.
For custom rules the protocol only guarantees a fired/not-fired
answer, so the count falls back to the trailing rounds at whose
end the rule fired (>= 1, since it fired this round).
"""
if isinstance(rule, _SupportsConsecutiveHeld):
return int(rule.consecutive_held(treatment, segment, history_view))
fired = 0
for end in range(len(history_view), 0, -1):
if not rule.is_candidate(treatment, segment, history_view[:end]):
break
fired += 1
return fired
def _optimized_allocation_map(
experiment: Experiment,
) -> dict[int, dict[str, float]] | None:
"""A prior round's Optimized-cell allocation map, where present."""
plan = experiment.next_recommendation
if plan is None:
return None
for cell in plan.cells:
if cell.id.startswith("optimized") and isinstance(
cell.policy, TreePolicy
):
return cell.policy.allocation_map
return None
# ---------------------------------------------------------------------------
# Prose templates — the single home of every operator-facing phrase the
# engine emits. Deterministic: no LLM, no randomness, no timestamps.
# ---------------------------------------------------------------------------
_COLD_START_PROSE = (
"Round 1 starts from a cold start: no posterior exists yet, so there is "
"no optimized policy to route traffic through.\n\n"
"The plan splits traffic evenly between a Control cell (clean baseline "
"measurement) and an Explore cell (uniform-random over the treatments) "
"to give the first fit clean signal for effect discovery."
)
_CONFIDENCE_OPENERS: dict[str, str] = {
"high": (
"We have strong, sustained evidence: at least one discovered segment "
"is stable across bootstrap refits, and a treatment has met the "
"graduation thresholds in consecutive rounds."
),
"medium": (
"Segment structure is firming up: at least one discovered segment is "
"stable across bootstrap refits, but no treatment has sustained the "
"graduation thresholds yet."
),
"low": (
"Early signal only: no discovered segment is stable enough to act on "
"yet, so the plan keeps exploring while the optimized policy "
"concentrates traffic where the model currently expects value."
),
}
_DECISION_FRAGMENTS: dict[str, str] = {
"SHIP": (
"This round's summary decision is SHIP — the leading treatment "
"clears the decision thresholds on the current evidence."
),
"CONTINUE": (
"This round's summary decision is CONTINUE — keep collecting data "
"before committing."
),
"STOP": (
"This round's summary decision is STOP — the evidence points to a "
"harmful or futile treatment."
),
}
_DECISION_FALLBACK = "This round's summary decision is {name}."
def _compose_prose(
*,
confidence: str,
decision: str,
n_visitors: int | None,
cells: list[Cell],
n_segments: int,
candidates: list[GraduationCandidate],
dropped: list[str],
) -> str:
"""Assemble the plan's multi-paragraph prose from the templates."""
split = ", ".join(f"{cell.id} {cell.weight:.0%}" for cell in cells)
sizing = (
f"{n_visitors:,} visitors"
if n_visitors is not None
else "no further rounds (the schedule is exhausted)"
)
decision_fragment = _DECISION_FRAGMENTS.get(
decision, _DECISION_FALLBACK.format(name=decision)
)
paragraphs = [
f"{_CONFIDENCE_OPENERS[confidence]} {decision_fragment}",
(
f"Next round: {sizing}, split {split}. The optimized cell routes "
f"through a policy tree over {n_segments} discovered segment(s) "
f"with per-segment Thompson allocation."
),
]
if candidates:
pairs = "; ".join(
f"{candidate.treatment} in segment {candidate.segment.id} "
f"({candidate.segment.rule.description})"
for candidate in candidates
)
paragraphs.append(
f"Graduation candidates (surfaced for the operator — nothing is "
f"auto-graduated): {pairs}."
)
if dropped:
paragraphs.append(
f"Dropped from the active list (allocation starved below the "
f"floor in every segment for consecutive rounds): "
f"{', '.join(dropped)}."
)
return "\n\n".join(paragraphs)