"""The ``SequentialExperiment`` state machine — the L1 round-by-round loop.
Each ``advance()`` (equivalently ``next(exp)``) runs one round: size the
round from the schedule, hand the current
:class:`~pytyche.experiment.recommendation.NextRoundPlan` to the
generator, fit the joint hurdle BCF on the CUMULATIVE data, analyze,
score the round's cells, ask the recommendation engine for the next
plan, and (sim mode) evaluate against the cumulative ground truth. All
state commits atomically at the end of the round — a failed step leaves
``history`` untouched.
The cumulative row order is variant-major: all rounds of the first-seen
variant, then all rounds of the next. Per-round artifacts aligned to a
round's own concatenation (truth arrays, the ``cell`` column) are
re-indexed into that order via the position map ``_build_cumulative``
returns.
"""
from __future__ import annotations
import dataclasses
import sys
import warnings
from collections.abc import Callable
from typing import TYPE_CHECKING, Literal
import numpy as np
import pandas as pd
from pytyche.analysis import analyze, evaluate_against_truth
from pytyche.bcf.config import HurdleBCFResult
from pytyche.bcf.hurdle import fit_hurdle_bcf
from pytyche.contracts import (
RESERVED_CELL_COLUMN,
AlignedVisitorArray,
CalibrationTruth,
ObservedExperimentData,
VariantData,
)
from pytyche.experiment.cells import Cell, validate_cell_weights
from pytyche.experiment.experiment import (
Experiment,
compute_cell_observations,
)
from pytyche.experiment.recommendation import (
GraduationCandidate,
GraduationRule,
NextRoundPlan,
cold_start_plan,
next_fit_num_trees_tau,
recommend,
)
if TYPE_CHECKING:
from collections.abc import Sequence
from pytyche.analysis import TruthComparison
from pytyche.calibrate.artifact import Calibration
from pytyche.contracts import AnalysisResult
from pytyche.experiment.schedule import Schedule
__all__ = [
"Generator",
"SequentialExperiment",
"UncalibratedWarning",
"sequential_experiment",
]
#: The single data-source contract. The loop invokes
#: ``generator(round_idx, plan)`` at the start of each round and receives
#: ``(observed, truth)`` — ``truth`` is non-None in sim mode and ``None``
#: in real-data mode (the mode may not flip between rounds). Every
#: returned visitors frame must carry the reserved ``cell`` column
#: recording each visitor's allocated cell id, and generators assigning
#: adaptively record per-visitor propensities in the reserved propensity
#: columns (``propensity`` at K = 2, ``propensity_1..propensity_{K-1}``
#: at K >= 3).
Generator = Callable[
[int, NextRoundPlan],
tuple[ObservedExperimentData, CalibrationTruth | None],
]
#: Cell-assignment sentinel for prior rounds' rows — matches no plan cell,
#: so per-cell observations cover the current round's members only.
_PAST_ROUNDS_SENTINEL = "__past__"
[docs]
class UncalibratedWarning(UserWarning):
"""Emitted once per :class:`SequentialExperiment` running uncalibrated.
Fires on the first ``advance()`` when the instance was constructed
with ``calibration=None``. Supply an SBC-fitted
:class:`~pytyche.calibrate.artifact.Calibration` artifact via the
``calibration=`` constructor parameter to correct interval coverage
and silence the warning.
"""
[docs]
class SequentialExperiment:
"""Stateful round-by-round adaptive experiment loop.
The instance is its own iterator: ``next(exp)`` (or ``exp.advance()``)
runs one round and returns the resulting
:class:`~pytyche.experiment.experiment.Experiment` snapshot;
iteration raises ``StopIteration`` when the schedule is exhausted.
Configuration is locked at construction; per-round cell overrides go
through ``advance(cells=...)``.
Scope: this surface assumes designed experiments — the operator emits
the assignment rules (the plan's cells and policies), the platform
assigns per those rules, and per-visitor propensities are known and
recorded. Heavily-confounded observational inference is out of scope
for this API; for that setting, consider econml or DoubleML.
Args:
generator: ``generator(round_idx, plan)`` returning
``(observed, truth | None)`` — the single data-source
contract. Sim-mode generators return ground truth; real-data
generators return ``None`` (the mode may not flip between
rounds). Every visitors frame must carry the reserved
``cell`` column recording each visitor's allocated cell id.
schedule: Per-round visitor counts
(:class:`~pytyche.experiment.schedule.Schedule` protocol).
treatments: The full treatments universe, control first. Drops
are recomputed from allocation history each round; dropped
names never leave this universe.
cells: Optional round-0 cell-structure override replacing the
cold-start Control/Explore 50/50 split.
min_control_weight: Guaranteed Control-cell share on
engine-proposed plans.
min_explore_weight: Guaranteed Explore-cell share on
engine-proposed plans.
max_segment_depth: Policy-tree depth bound for the per-round
analysis and recommendation.
min_segment_share: Minimum per-leaf population share.
calibration: SBC-fitted artifact applied on-path each round, or
``None`` (uncalibrated; warns once per instance).
graduation_rule: ``None`` uses
:class:`~pytyche.experiment.recommendation.ExpectedLossRule`
at its defaults.
seed: Master seed — per-round fit seeds derive from it, and it
seeds the policy tree's stability bootstrap.
progress: When True, each round's fit renders tqdm progress bars
on stderr (passed through as
``fit_hurdle_bcf(progress=True)``). Default False — silent.
Raises:
ValueError: When ``min_control_weight + min_explore_weight``
is not below 1.0 (no room for the Optimized cell).
"""
def __init__(
self,
*,
generator: Generator,
schedule: Schedule,
treatments: Sequence[str],
cells: list[Cell] | None = None,
min_control_weight: float = 0.05,
min_explore_weight: float = 0.05,
max_segment_depth: int = 3,
min_segment_share: float = 0.10,
calibration: Calibration | None = None,
graduation_rule: GraduationRule | None = None,
seed: int = 0,
progress: bool = False,
) -> None:
if min_control_weight + min_explore_weight >= 1.0:
raise ValueError(
f"min_control_weight + min_explore_weight must be < 1.0 to "
f"leave room for the Optimized cell; got {min_control_weight}"
f" + {min_explore_weight} = "
f"{min_control_weight + min_explore_weight}"
)
if cells is not None:
validate_cell_weights(cells)
self._generator = generator
self._schedule = schedule
self._treatments = list(treatments)
self._cells = list(cells) if cells is not None else None
self._min_control_weight = min_control_weight
self._min_explore_weight = min_explore_weight
self._max_segment_depth = max_segment_depth
self._min_segment_share = min_segment_share
self._calibration = calibration
self._graduation_rule = graduation_rule
self._seed = seed
self._progress = progress
self._history: list[Experiment] = []
self._round_data: list[ObservedExperimentData] = []
self._round_truths: list[CalibrationTruth | None] = []
self._current_plan: NextRoundPlan | None = None
self._warned_uncalibrated = False
# ------------------------------------------------------------------
# Iteration
# ------------------------------------------------------------------
def __iter__(self) -> SequentialExperiment:
return self
def __next__(self) -> Experiment:
return self.advance()
[docs]
def advance(self, cells: list[Cell] | None = None) -> Experiment:
"""Run one round and return its
:class:`~pytyche.experiment.experiment.Experiment` snapshot.
``cells`` overrides this round's cell structure only; subsequent
rounds revert to the recommendation engine's proposals. Overrides
must satisfy the weight-sum invariant but are NOT subject to the
engine's min-weight floors.
All state commits only after every step succeeds — a failure at
any step (missing ``cell`` column, calibration regime mismatch,
...) leaves ``history`` and the cumulative data unchanged.
Raises:
StopIteration: When the schedule is exhausted.
ValueError: When the override cells' weights do not sum to
1.0, when round data lacks the reserved ``cell`` column
or labels visitors with ids outside the round's plan,
when a variant name falls outside the treatments
universe, or when the round's identity
(``experiment_id``, ``metric``, sim/real mode) conflicts
with prior rounds.
"""
round_idx = len(self._history)
size = self._schedule.next_round_size(round_idx)
if size is None:
raise StopIteration
if self._current_plan is None:
plan = cold_start_plan(list(self._treatments), n_visitors=size)
if self._cells is not None:
plan = dataclasses.replace(plan, cells=list(self._cells))
else:
plan = self._current_plan
if cells is not None:
plan = dataclasses.replace(plan, cells=list(cells))
if any(c.id == _PAST_ROUNDS_SENTINEL for c in plan.cells):
raise ValueError(
f"cell id {_PAST_ROUNDS_SENTINEL!r} is reserved for the "
"loop's prior-round bookkeeping and cannot name a plan cell"
)
data, truth = self._generator(round_idx, plan)
self._validate_round(data, truth)
if self._calibration is None and not self._warned_uncalibrated:
self._warned_uncalibrated = True
# stacklevel=3 attributes past advance() AND __next__ at the
# user's `next(exp)` line — the iteration path is the
# documented surface, and library paths must not leak into
# rendered notebook output. Direct advance() callers get
# one frame higher; same compromise as NoCudaWarning.
warnings.warn(
"Running uncalibrated BCF posteriors (calibration=None); "
"interval coverage may be miscalibrated at scale. Supply an "
"SBC-fitted artifact via sequential_experiment("
"calibration=...) to correct it.",
UncalibratedWarning,
stacklevel=3,
)
cumulative, positions = _build_cumulative([*self._round_data, data])
n_total = sum(v.n_visitors for v in cumulative.variants)
analysis = self._fit_round(cumulative, round_idx)
posterior = analysis.posterior
assert isinstance(posterior, HurdleBCFResult) # the loop fits hurdle
assignment = np.full(n_total, _PAST_ROUNDS_SENTINEL, dtype=object)
current_cells = pd.concat(
[v.visitors[RESERVED_CELL_COLUMN] for v in data.variants],
ignore_index=True,
).to_numpy()
unknown = sorted(set(current_cells) - {c.id for c in plan.cells})
if unknown:
raise ValueError(
f"round {round_idx} {RESERVED_CELL_COLUMN!r} column carries "
f"ids {unknown} that match no cell in the round's plan "
f"{sorted(c.id for c in plan.cells)}; every visitor must be "
"labeled with one of the plan's cell ids"
)
assignment[positions[-1]] = current_cells
cell_observations = compute_cell_observations(
posterior, plan.cells, assignment
)
next_plan = recommend(
analysis,
history=list(self._history),
treatments=list(self._treatments),
schedule=self._schedule,
min_control_weight=self._min_control_weight,
min_explore_weight=self._min_explore_weight,
max_segment_depth=self._max_segment_depth,
min_segment_share=self._min_segment_share,
graduation_rule=self._graduation_rule,
seed=self._seed,
)
truth_comparison: TruthComparison | None
if truth is None:
truth_comparison = None
else:
sim_truths = [
t for t in [*self._round_truths, truth] if t is not None
]
cumulative_truth = _cumulative_truth(
sim_truths, positions, n_total=n_total
)
assert next_plan.tree is not None # engine plans carry the fit
truth_comparison = evaluate_against_truth(
posterior, next_plan.tree, cumulative_truth
)
experiment = Experiment(
round_idx=round_idx,
posterior=posterior,
analysis=analysis,
cells_shipped=list(plan.cells),
cell_observations=cell_observations,
next_recommendation=next_plan,
truth_comparison=truth_comparison,
)
self._round_data.append(data)
self._round_truths.append(truth)
self._history.append(experiment)
self._current_plan = next_plan
return experiment
[docs]
def run_to_completion(self) -> SequentialExperiment:
"""Advance until the schedule is exhausted or a candidate emerges.
Returns ``self`` for chaining.
Raises:
ValueError: When the schedule is open-ended (``n_rounds`` is
``None``) — the operator must provide a stop condition.
"""
if self._schedule.n_rounds is None:
raise ValueError(
"run_to_completion requires a bounded schedule, but "
"schedule.n_rounds is None (open-ended); advance manually "
"or supply a schedule with n_rounds set"
)
while not self.has_graduation_candidate():
try:
self.advance()
except StopIteration:
break
return self
# ------------------------------------------------------------------
# Per-round fit (the documented test seam)
# ------------------------------------------------------------------
def _fit_round(
self, cumulative: ObservedExperimentData, round_idx: int
) -> AnalysisResult:
"""Fit the cumulative data and analyze it — the loop's fit seam.
The fit seed derives deterministically from the constructor seed
and the round index via
``np.random.SeedSequence((seed, round_idx))`` — distinct rounds
get decorrelated MCMC streams while the whole series stays
reproducible from the one constructor seed.
Round 0 fits at the config-default tau-forest size; later rounds
size ``num_trees_tau`` from the previous round's evidence via
:func:`~pytyche.experiment.recommendation.next_fit_num_trees_tau`.
"""
fit_seed = int(
np.random.SeedSequence((self._seed, round_idx)).generate_state(1)[0]
)
if self._progress:
total = sum(v.n_visitors for v in cumulative.variants)
print(
f"round {round_idx}: fitting {total:,} cumulative visitors",
file=sys.stderr,
flush=True,
)
if round_idx > 0:
prev = self._history[-1]
prev_plan = prev.next_recommendation
assert prev_plan is not None and prev_plan.tree is not None
assert isinstance(prev.posterior, HurdleBCFResult)
cumulative_n = sum(v.n_visitors for v in cumulative.variants)
posterior = fit_hurdle_bcf(
cumulative,
calibration=self._calibration,
seed=fit_seed,
progress=self._progress,
num_trees_tau=next_fit_num_trees_tau(
prev.posterior, prev_plan.tree, cumulative_n
),
)
else:
posterior = fit_hurdle_bcf(
cumulative,
calibration=self._calibration,
seed=fit_seed,
progress=self._progress,
)
return analyze(
posterior,
max_depth=self._max_segment_depth,
min_segment_share=self._min_segment_share,
bootstrap_seed=self._seed,
)
# ------------------------------------------------------------------
# Round-data validation
# ------------------------------------------------------------------
def _validate_round(
self, data: ObservedExperimentData, truth: CalibrationTruth | None
) -> None:
"""Reject round data that breaks the loop's cross-round contract."""
for variant in data.variants:
if RESERVED_CELL_COLUMN not in variant.visitors.columns:
raise ValueError(
f"round {len(self._history)} data for variant "
f"{variant.name!r} lacks the reserved "
f"{RESERVED_CELL_COLUMN!r} column; generators must "
"record each visitor's allocated cell id at generation "
"time (it is not derivable from the treatment received)"
)
rogue = [
v.name for v in data.variants if v.name not in self._treatments
]
if rogue:
raise ValueError(
f"round {len(self._history)} data carries variant names "
f"{rogue} outside the treatments universe "
f"{self._treatments}"
)
if not self._round_data:
return
first = self._round_data[0]
if data.experiment_id != first.experiment_id:
raise ValueError(
f"round data experiment_id {data.experiment_id!r} does not "
f"match the experiment's {first.experiment_id!r}"
)
if data.metric != first.metric:
raise ValueError(
f"round data metric {data.metric!r} does not match the "
f"experiment's {first.metric!r}"
)
prior_sim = self._round_truths[0] is not None
if (truth is not None) != prior_sim:
raise ValueError(
"sim/real mode may not flip between rounds: prior rounds "
f"returned truth {'set' if prior_sim else 'None'} but this "
f"round returned truth "
f"{'set' if truth is not None else 'None'}"
)
# ------------------------------------------------------------------
# History + state accessors
# ------------------------------------------------------------------
@property
def history(self) -> list[Experiment]:
"""Completed rounds, oldest first (a defensive copy)."""
return list(self._history)
@property
def latest(self) -> Experiment | None:
"""The most recent round, or ``None`` before the first."""
return self._history[-1] if self._history else None
@property
def current_round_idx(self) -> int:
"""Number of completed rounds (the next round's index)."""
return len(self._history)
@property
def next_recommendation(self) -> NextRoundPlan | None:
"""The engine's plan for the next round; ``None`` before round 0."""
if not self._history:
return None
return self._history[-1].next_recommendation
# ------------------------------------------------------------------
# Capability methods (read-only)
# ------------------------------------------------------------------
[docs]
def graduation_candidates(
self, sustained_rounds: int = 2
) -> list[GraduationCandidate]:
"""Latest-round candidates sustained for at least *sustained_rounds*."""
plan = self.next_recommendation
if plan is None:
return []
return [
candidate
for candidate in plan.graduation_candidates
if candidate.sustained_rounds >= sustained_rounds
]
[docs]
def has_graduation_candidate(self) -> bool:
"""Whether any candidate meets the default sustained-rounds bar."""
return bool(self.graduation_candidates())
[docs]
def has_credible_segments(self, stability_threshold: float = 0.80) -> bool:
"""Whether any latest-round segment clears *stability_threshold*."""
if not self._history:
return False
return any(
segment.stability_score >= stability_threshold
for segment in self._history[-1].analysis.segments
)
[docs]
def dropped_treatments(self) -> list[str]:
"""Universe treatments absent from the latest plan's active list."""
plan = self.next_recommendation
if plan is None:
return []
return [
name for name in self._treatments if name not in plan.treatments
]
# ------------------------------------------------------------------
# Series-level summary + confidence
# ------------------------------------------------------------------
@property
def summary(self) -> str:
"""Deterministic multi-paragraph prose over the series so far."""
if not self._history:
return (
"No rounds completed yet.\n\n"
f"The experiment is configured over treatments "
f"{self._treatments} and advances one round per "
"next(exp) / advance() call."
)
latest = self._history[-1]
plan = latest.next_recommendation
assert plan is not None # yielded experiments always carry a plan
paragraphs = [
(
f"{len(self._history)} round(s) completed; confidence is "
f"{self.confidence}. Latest {latest.summary_one_line()}"
),
plan.prose_summary,
]
candidates = self.graduation_candidates()
if candidates:
pairs = "; ".join(
f"{candidate.treatment} in segment {candidate.segment.id}"
for candidate in candidates
)
paragraphs.append(f"Graduation candidates: {pairs}.")
dropped = self.dropped_treatments()
if dropped:
paragraphs.append(
f"Dropped from the active list: {', '.join(dropped)}."
)
return "\n\n".join(paragraphs)
@property
def confidence(self) -> Literal["high", "medium", "low"]:
"""One-word label from the credibility + graduation state."""
credible = self.has_credible_segments(0.80)
if credible and self.has_graduation_candidate():
return "high"
if credible:
return "medium"
return "low"
[docs]
def sequential_experiment(
*,
generator: Generator,
schedule: Schedule,
treatments: Sequence[str],
cells: list[Cell] | None = None,
min_control_weight: float = 0.05,
min_explore_weight: float = 0.05,
max_segment_depth: int = 3,
min_segment_share: float = 0.10,
calibration: Calibration | None = None,
graduation_rule: GraduationRule | None = None,
seed: int = 0,
progress: bool = False,
) -> SequentialExperiment:
"""Construct a :class:`SequentialExperiment` — the L1 entry point.
The verb form of the :class:`SequentialExperiment` constructor with
the identical signature; see the class docstring for the parameter
contract. Iterate the returned instance (``next(exp)``) to advance
one round at a time, or call ``run_to_completion()`` on a bounded
schedule.
Scope: this surface assumes designed experiments — the operator emits
the assignment rules (the plan's cells and policies), the platform
assigns per those rules, and per-visitor propensities are known and
recorded. Heavily-confounded observational inference is out of scope
for this API; for that setting, consider econml or DoubleML.
"""
return SequentialExperiment(
generator=generator,
schedule=schedule,
treatments=treatments,
cells=cells,
min_control_weight=min_control_weight,
min_explore_weight=min_explore_weight,
max_segment_depth=max_segment_depth,
min_segment_share=min_segment_share,
calibration=calibration,
graduation_rule=graduation_rule,
seed=seed,
progress=progress,
)
# ---------------------------------------------------------------------------
# Cumulative assembly helpers
# ---------------------------------------------------------------------------
def _build_cumulative(
rounds: list[ObservedExperimentData],
) -> tuple[ObservedExperimentData, list[np.ndarray]]:
"""Union the rounds' data into one cumulative ``ObservedExperimentData``.
Variants are the union of the rounds' variant names in first-seen
order (round 0's order first); each cumulative variant concatenates
that variant's per-round frames in round order, so the cumulative row
order is VARIANT-MAJOR: all rounds of variant 0, then all rounds of
variant 1, and so on. Summary fields are summed across rounds.
Returns:
``(cumulative, positions)`` where ``positions[r][i]`` is the
cumulative row index of round *r*'s local concatenation row *i*
(the round's own variant-major order) — the map for re-indexing
round-aligned artifacts (truth arrays, the ``cell`` column) into
cumulative order.
"""
order: list[str] = []
per_variant: dict[str, list[tuple[int, VariantData]]] = {}
for r, data in enumerate(rounds):
for variant in data.variants:
if variant.name not in per_variant:
order.append(variant.name)
per_variant[variant.name] = []
per_variant[variant.name].append((r, variant))
cum_start: dict[tuple[int, str], int] = {}
offset = 0
for name in order:
for r, variant in per_variant[name]:
cum_start[(r, name)] = offset
offset += variant.n_visitors
positions = [
np.concatenate(
[
np.arange(
cum_start[(r, v.name)],
cum_start[(r, v.name)] + v.n_visitors,
)
for v in data.variants
]
)
for r, data in enumerate(rounds)
]
variants = [
VariantData(
name=name,
visitors=pd.concat(
[v.visitors for _, v in per_variant[name]],
ignore_index=True,
),
n_visitors=sum(v.n_visitors for _, v in per_variant[name]),
n_conversions=sum(v.n_conversions for _, v in per_variant[name]),
total_revenue=float(
sum(v.total_revenue for _, v in per_variant[name])
),
)
for name in order
]
cumulative = ObservedExperimentData(
experiment_id=rounds[0].experiment_id,
metric=rounds[0].metric,
variants=variants,
)
return cumulative, positions
def _cumulative_truth(
truths: list[CalibrationTruth],
positions: list[np.ndarray],
*,
n_total: int,
) -> CalibrationTruth:
"""Merge per-round truths into one cumulative ``CalibrationTruth``.
Per-visitor arrays (round-aligned to each round's own variant-major
concatenation) are re-indexed into the cumulative row order via
*positions*. Scalars: ``metric_id`` / ``metric_family`` must match
across rounds; ``effect`` and each ``effect_components`` entry are
row-count-weighted means.
Raises:
ValueError: On metric identity mismatch, effect-component key
mismatch, or a per-visitor field populated in some rounds
but not others.
"""
first = truths[0]
for t in truths[1:]:
if t.metric_id != first.metric_id:
raise ValueError(
f"truth metric_id changed between rounds: "
f"{first.metric_id!r} vs {t.metric_id!r}"
)
if t.metric_family != first.metric_family:
raise ValueError(
f"truth metric_family changed between rounds: "
f"{first.metric_family!r} vs {t.metric_family!r}"
)
if set(t.effect_components) != set(first.effect_components):
raise ValueError(
f"truth effect_components keys changed between rounds: "
f"{sorted(first.effect_components)} vs "
f"{sorted(t.effect_components)}"
)
weights = np.array([len(p) for p in positions], dtype=np.float64)
total_weight = float(weights.sum())
effect = float(
sum(w * t.effect for w, t in zip(weights, truths, strict=True))
/ total_weight
)
effect_components = {
key: float(
sum(
w * t.effect_components[key]
for w, t in zip(weights, truths, strict=True)
)
/ total_weight
)
for key in first.effect_components
}
def stitch(arrays: list[AlignedVisitorArray]) -> AlignedVisitorArray:
out = np.empty(n_total, dtype=np.float64)
for pos, arr in zip(positions, arrays, strict=True):
out[pos] = np.asarray(arr.values, dtype=np.float64)
return AlignedVisitorArray(values=out, n_visitors=n_total)
def realign(field: str) -> AlignedVisitorArray | None:
arrays = [getattr(t, field) for t in truths]
if all(a is None for a in arrays):
return None
if any(a is None for a in arrays):
raise ValueError(
f"truth field {field!r} is populated in some rounds but "
"not others"
)
return stitch(arrays)
def realign_list(field: str) -> list[AlignedVisitorArray] | None:
lists = [getattr(t, field) for t in truths]
if all(entry is None for entry in lists):
return None
if any(entry is None for entry in lists):
raise ValueError(
f"truth field {field!r} is populated in some rounds but "
"not others"
)
lengths = {len(entry) for entry in lists}
if len(lengths) != 1:
raise ValueError(
f"truth field {field!r} length changed between rounds: "
f"{sorted(lengths)}"
)
return [
stitch([entry[i] for entry in lists])
for i in range(next(iter(lengths)))
]
return CalibrationTruth(
effect=effect,
metric_id=first.metric_id,
metric_family=first.metric_family,
effect_components=effect_components,
cate_per_visitor=realign("cate_per_visitor"),
conv_cate_per_visitor=realign("conv_cate_per_visitor"),
aov_cate_per_visitor=realign("aov_cate_per_visitor"),
p0_per_visitor=realign("p0_per_visitor"),
p1_per_visitor=realign("p1_per_visitor"),
m0_per_visitor=realign("m0_per_visitor"),
m1_per_visitor=realign("m1_per_visitor"),
contrast_cate_per_visitor=realign_list("contrast_cate_per_visitor"),
p_per_visitor=realign_list("p_per_visitor"),
m_per_visitor=realign_list("m_per_visitor"),
)