Files
chat/chat/web/meanwhile.py
T
Joseph Doherty 456f50d334 feat: branching read-side filter — event readers consult active branch range (T113)
Wire the active branch's [origin_event_id, head_event_id] window into
every user-facing event/memory reader so switching branches actually
changes what dialogue and memories the user sees. Phase 4 T89/T94
shipped branches as metadata-only — this closes the loop.

Helper:
- chat/state/branches.py: add `active_branch_event_ids(conn)` returning
  the active branch's id range, with two defensive fall-throughs to
  `(0, BIG_INT)`: (a) no active branch row at all, and (b) the
  bootstrap "main" sentinel (name="main", origin=0, head=0). Production
  never bumps main's head_event_id today, so this preserves existing
  reader behaviour for every test that doesn't explicitly switch.

Readers updated (all user-facing dialogue / retrieval surfaces):
- chat/services/turn_common.py::read_recent_dialogue — chat-history
  prompt context + the chat-view template path (via web/turns.py +
  web/chat.py).
- chat/services/scene_summarize.py::_read_recent_dialogue — scene-close
  per-POV summary input.
- chat/state/memory.py::search_memories — FTS leg filters via
  m.event_id (T109's column); legacy NULL event_id rows are *included*
  unconditionally so the filter doesn't break pre-0014 retrieval. The
  fused (FTS + RRF + vector) path also drops vector hits whose
  event_id falls outside the branch window.
- chat/web/meanwhile.py::_read_recent_meanwhile_dialogue — meanwhile
  prompt context.

Projector queries (chat/state/world.py et al.) and admin/management
surfaces (drawer hide-panel, cross-chat search, regenerate's row
lookups by id) are intentionally NOT branch-filtered: projection must
see the full log to build state correctly, and the admin surfaces
operate across branches by design.

Tests (10 new, 446 total):
- tests/test_branches_state.py: 3 tests for `active_branch_event_ids`
  itself (bootstrap-main, no-active-branch, non-main literal range).
- tests/test_branching.py: 7 cross-feature tests covering the spec's
  five required scenarios plus scene_summarize and meanwhile readers.
2026-04-27 06:25:22 -04:00

415 lines
16 KiB
Python

"""Meanwhile-mode turn controller (T64).
A meanwhile scene is a private 2-bot scene running alongside an active
you-scene (its parent). The user manually advances it by POSTing to the
existing ``/chats/<id>/turns`` endpoint; the route detects an active
meanwhile scene at the start of ``post_turn`` and dispatches here.
Unlike the normal turn flow, "you" is NOT a witness to the scene. The
controller mirrors ``post_turn`` shape but with:
- Speaker alternation derived from the latest meanwhile ``assistant_turn``
scoped to this scene_id (host first, then alternating).
- Prompt assembly with ``present_set_kind="host_guest"`` so the prompt
builder drops the "you" activity bullet and any speaker -> "you" edge.
- Memory writes via ``record_meanwhile_memory`` — both bots get rows
with witness flags ``[you=0, host=1, guest=1]``.
- State updates over exactly 2 directed pairs (host <-> guest); no
you-related pairs fire.
- The ``assistant_turn`` payload carries ``meanwhile_scene_id`` and
``present_set_kind="host_guest"`` so downstream filters (alternation
lookup, drawer rendering, scene-close detection) can scope to the
meanwhile slice without conflating it with the parent you-scene's
history.
Scene-close detection for meanwhile scenes is not auto-fired here —
T65 covers the close + digest pipeline. The controller's job ends
after the post-turn classifier passes land.
"""
from __future__ import annotations
import asyncio
import json
from chat.config import Settings
from chat.eventlog.log import append_and_apply, append_event
from chat.llm.client import LLMClient
from chat.services.memory_write import record_meanwhile_memory
from chat.services.multi_state_update import compute_state_updates_for_present
from chat.services.prompt import assemble_narrative_prompt
from chat.services.turn_parse import parse_turn
from chat.state.edges import get_edge
from chat.state.entities import get_bot
from chat.state.meanwhile import list_meanwhile_scenes
from chat.state.world import get_chat
from chat.web.pubsub import publish
from chat.web.render import render_turn_html as _render_turn_html
def _strip_ooc_for_prompt(parsed) -> str:
"""Mirror of the helper in turns.py — concatenate non-OOC segments."""
keep = [s.text for s in parsed.segments if s.kind != "ooc"]
return " ".join(keep).strip()
def _read_recent_meanwhile_dialogue(
conn, chat_id: str, scene_id: int, limit: int = 50
) -> list[dict]:
"""Return the meanwhile scene's prior turns shaped as
``{"speaker": <id>, "text": <prose>}``.
Pulls ``user_turn`` rows for the chat (the user-side prose driving
this meanwhile scene rides through the same chat) plus only those
``assistant_turn`` rows whose ``meanwhile_scene_id`` matches the
given scene id. Other meanwhile scenes on the same chat — and the
parent you-scene's assistant_turns — are excluded so the prompt
context stays scoped to the private beat.
Filters chat_id (and meanwhile_scene_id for assistant_turn) via
``json_extract`` in SQL so SQLite stops at the first ``limit`` rows
that already match — avoids an unbounded scan as ``event_log``
grows. The user-side rows match on chat_id only since they aren't
tagged with a scene id (they ride the chat-wide log).
T113: clamp by the active branch's ``[origin, head]`` event-id range
so meanwhile prompt context respects the user's current branch.
Bootstrap-main and "no active branch" both fall through to ``(0,
BIG_INT)`` — no functional change for the metadata-only Phase 4 era.
"""
from chat.state.branches import active_branch_event_ids
origin, head = active_branch_event_ids(conn)
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
" AND id BETWEEN ? AND ? "
" AND json_extract(payload_json, '$.chat_id') = ? "
" AND ("
" kind IN ('user_turn', 'user_turn_edit') "
" OR json_extract(payload_json, '$.meanwhile_scene_id') = ?"
" ) "
"ORDER BY id DESC LIMIT ?",
(origin, head, chat_id, scene_id, limit),
)
rows = cur.fetchall()
rows.reverse()
out: list[dict] = []
for _row_id, kind, payload_json in rows:
p = json.loads(payload_json)
if kind in ("user_turn", "user_turn_edit"):
out.append({"speaker": "you", "text": p.get("prose", "")})
else:
out.append(
{
"speaker": p.get("speaker_id", "bot"),
"text": p.get("text", ""),
}
)
return out
def _last_meanwhile_speaker(conn, chat_id: str, scene_id: int) -> str | None:
"""Return the speaker_id of the latest assistant_turn linked to
``scene_id`` for ``chat_id``, or ``None`` if no prior turn exists.
Used to alternate the speaker across consecutive meanwhile turns —
the OTHER bot speaks next. Pushes both filters into SQL via
``json_extract`` and bounds with ``LIMIT 1`` so SQLite stops at the
first match instead of scanning the whole assistant_turn slice.
"""
row = conn.execute(
"SELECT json_extract(payload_json, '$.speaker_id') AS speaker "
"FROM event_log "
"WHERE kind = 'assistant_turn' "
" AND superseded_by IS NULL AND hidden = 0 "
" AND json_extract(payload_json, '$.chat_id') = ? "
" AND json_extract(payload_json, '$.meanwhile_scene_id') = ? "
"ORDER BY id DESC "
"LIMIT 1",
(chat_id, scene_id),
).fetchone()
return row[0] if row else None
async def process_meanwhile_turn(
conn,
client: LLMClient,
settings: Settings,
*,
chat_id: str,
prose: str,
app=None,
) -> dict:
"""Run one meanwhile turn end-to-end.
Returns a small dict shape ``{"assistant_text": ..., "speaker_id":
..., "scene_id": ..., "user_turn_id": ..., "assistant_event_id":
...}`` so callers can introspect the produced beat (HTTP route maps
to a ``204``; future SSE rebroadcast may use the dict directly).
Raises ``ValueError`` when there is no active meanwhile scene on
``chat_id`` — the caller (turns.py) only dispatches here after a
positive ``list_meanwhile_scenes`` lookup, but the defensive raise
keeps the controller usable in isolation.
"""
chat = get_chat(conn, chat_id)
if chat is None:
raise ValueError(f"chat not found: {chat_id}")
scenes = list_meanwhile_scenes(conn, chat_id, status="active")
if not scenes:
raise ValueError(f"no active meanwhile scene on chat: {chat_id}")
scene = scenes[0]
scene_id = scene["id"]
host_bot_id = chat["host_bot_id"]
guest_bot_id = chat.get("guest_bot_id")
if guest_bot_id is None:
# A meanwhile scene without a guest is a schema violation —
# the projector requires both ids on meanwhile_scene_started.
raise ValueError(
f"meanwhile scene {scene_id} on chat {chat_id} lacks a guest"
)
host_bot = get_bot(conn, host_bot_id)
guest_bot = get_bot(conn, guest_bot_id)
if host_bot is None or guest_bot is None:
raise ValueError(
f"meanwhile bots missing: host={host_bot_id} guest={guest_bot_id}"
)
# 1. Parse the user prose with the classifier — same shape as the
# normal turn flow so OOC-stripping, segment-typing, etc. all work.
parsed = await parse_turn(
client, model=settings.classifier_model, prose=prose
)
prompt_prose = _strip_ooc_for_prompt(parsed)
# 2. Append user_turn — kept on the chat-wide log so the user can
# see their own prose in the timeline. Tagged with the meanwhile
# scene_id so future renderers can group it with the right scene.
user_turn_event_id = append_event(
conn,
kind="user_turn",
payload={
"chat_id": chat_id,
"prose": prose,
"segments": [s.model_dump() for s in parsed.segments],
"meanwhile_scene_id": scene_id,
},
)
# 3. Alternate the speaker. First turn -> host speaks; each
# subsequent turn -> the OTHER bot from the previous beat. Lookup
# is scoped by ``meanwhile_scene_id`` so unrelated assistant_turns
# on the same chat don't perturb the alternation.
last_speaker = _last_meanwhile_speaker(conn, chat_id, scene_id)
if last_speaker is None or last_speaker == guest_bot_id:
speaker_bot = host_bot
addressee_bot = guest_bot
else:
speaker_bot = guest_bot
addressee_bot = host_bot
# 4. Placeholder marker so SSE observers see "in flight". No
# projector handler is registered for this kind — it's transcript-
# only, same as the normal turn flow.
append_event(
conn,
kind="assistant_turn_started",
payload={
"chat_id": chat_id,
"speaker_id": speaker_bot["id"],
"user_turn_id": user_turn_event_id,
"meanwhile_scene_id": scene_id,
},
)
# 5. Build the narrative prompt. ``present_set_kind="host_guest"``
# tells the assembler to drop the "you" activity bullet and any
# speaker -> "you" edge — both irrelevant inside a private beat.
# Addressee is the OTHER bot, not "you".
recent_dialogue = _read_recent_meanwhile_dialogue(conn, chat_id, scene_id)
if recent_dialogue and recent_dialogue[-1].get("speaker") == "you":
recent_dialogue = recent_dialogue[:-1]
messages = assemble_narrative_prompt(
conn,
chat_id=chat_id,
speaker_bot_id=speaker_bot["id"],
addressee=addressee_bot["id"],
user_turn_prose=prompt_prose if prompt_prose else None,
recent_dialogue=recent_dialogue,
budget_soft=settings.narrative_budget_soft,
budget_hard=settings.narrative_budget_hard,
guest_id=guest_bot_id,
present_set_kind="host_guest",
)
# 6. Stream + accumulate. Same SSE pattern as the normal flow —
# tokens publish under the speaker's id so the UI can label the
# right bubble. Register the streaming task in the chat-keyed
# in-flight registry so POST /chats/<id>/turns/cancel can call
# ``.cancel()`` on it; without this, the Stop button is a no-op for
# meanwhile beats. We import the underscore name from turns.py
# deliberately — it's the same single-process registry the cancel
# route reads, and exposing it via a public alias would require
# touching every existing call site for no behavioural gain.
from chat.web.turns import _in_flight_tasks # noqa: PLC0415
accumulated: list[str] = []
truncated = False
cancelled = False
async def _stream() -> None:
async for chunk in client.stream(
messages,
model=settings.narrative_model,
max_tokens=settings.narrative_max_tokens,
temperature=settings.narrative_temperature,
):
accumulated.append(chunk)
await publish(
chat_id,
{
"event": "token",
"text": chunk,
"speaker_id": speaker_bot["id"],
},
)
stream_task = asyncio.create_task(_stream())
_in_flight_tasks[chat_id] = stream_task
try:
await stream_task
except asyncio.CancelledError:
truncated = True
cancelled = True
except Exception:
truncated = True
finally:
# Always unregister so a subsequent turn can register a fresh
# task. Mirrors the cleanup in turns.py::post_turn.
_in_flight_tasks.pop(chat_id, None)
text = "".join(accumulated)
# 7. Append assistant_turn — tagged with meanwhile_scene_id so the
# next turn's alternation lookup can find it, and present_set_kind
# so downstream renderers / digesters can filter scope.
assistant_event_id = append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": chat_id,
"speaker_id": speaker_bot["id"],
"text": text,
"truncated": truncated,
"user_turn_id": user_turn_event_id,
"meanwhile_scene_id": scene_id,
"present_set_kind": "host_guest",
},
)
# 8. Per-turn memory writes — both bots get a row with witness flags
# [you=0, host=1, guest=1]. Skipped on cancellation so we don't
# record memory for a partial beat the user never read.
if not cancelled and text.strip():
record_meanwhile_memory(
conn,
chat_id=chat_id,
host_bot_id=host_bot_id,
guest_bot_id=guest_bot_id,
narrative_text=text,
scene_id=scene_id,
chat_clock_at=chat.get("time"),
app=app,
)
# 9. Post-turn state-update — exactly 2 directed pairs over the
# bot pair. No you-related pairs fire (you isn't present).
present_ids = [host_bot_id, guest_bot_id]
present_names = {
host_bot_id: host_bot["name"],
guest_bot_id: guest_bot["name"],
}
personas = {
host_bot_id: host_bot.get("persona") or "",
guest_bot_id: guest_bot.get("persona") or "",
}
prior_edges: dict[tuple[str, str], dict] = {}
for src in present_ids:
for tgt in present_ids:
if src == tgt:
continue
edge = get_edge(conn, src, tgt) or {
"affinity": 50,
"trust": 50,
"summary": "",
}
prior_edges[(src, tgt)] = edge
state_updates = await compute_state_updates_for_present(
client,
classifier_model=settings.classifier_model,
present_ids=present_ids,
present_names=present_names,
personas=personas,
prior_edges=prior_edges,
recent_dialogue=recent_dialogue,
timeout_s=settings.classifier_timeout_s,
)
last_at = chat.get("time")
for src_id, tgt_id, update in state_updates:
append_and_apply(
conn,
kind="edge_update",
payload={
"source_id": src_id,
"target_id": tgt_id,
"chat_id": chat_id,
"affinity_delta": update.affinity_delta,
"trust_delta": update.trust_delta,
"knowledge_facts": update.knowledge_facts,
"last_interaction_at": last_at,
"last_interaction_chat_id": chat_id,
},
)
# 10. SSE broadcast for the timeline UI — completion event + an HTML
# fragment for the HTMX SSE swap. Same pattern as the normal turn
# flow so the rendered transcript shows the meanwhile beat inline.
await publish(
chat_id,
{
"event": "assistant_turn_complete",
"speaker_id": speaker_bot["id"],
"text": text,
"truncated": truncated,
},
)
turn_html = _render_turn_html(
speaker_bot["name"],
text,
role="bot",
event_id=assistant_event_id,
)
await publish(chat_id, {"event": "turn_html", "data": turn_html})
if cancelled:
# Re-raise after the partial-turn has been recorded so callers
# see the cancel propagate (mirrors normal turn flow).
raise asyncio.CancelledError
return {
"assistant_text": text,
"speaker_id": speaker_bot["id"],
"scene_id": scene_id,
"user_turn_id": user_turn_event_id,
"assistant_event_id": assistant_event_id,
}
__all__ = ["process_meanwhile_turn"]