From cf43ba09930e0e631c866c185dc2c9b0470e4710 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 21:00:56 -0400 Subject: [PATCH] feat: meanwhile turn flow (host+guest, no you) (T64) --- chat/services/memory_write.py | 57 +++ chat/services/prompt.py | 26 +- chat/web/meanwhile.py | 398 +++++++++++++++++++++ chat/web/turns.py | 25 ++ tests/test_meanwhile_turn_flow.py | 560 ++++++++++++++++++++++++++++++ 5 files changed, 1061 insertions(+), 5 deletions(-) create mode 100644 chat/web/meanwhile.py create mode 100644 tests/test_meanwhile_turn_flow.py diff --git a/chat/services/memory_write.py b/chat/services/memory_write.py index 0a6f9b1..6fc6ecf 100644 --- a/chat/services/memory_write.py +++ b/chat/services/memory_write.py @@ -176,3 +176,60 @@ def record_turn_memory_for_present( significance=significance, ) return result + + +def record_meanwhile_memory( + conn: Connection, + *, + chat_id: str, + host_bot_id: str, + guest_bot_id: str, + narrative_text: str, + scene_id: int | None = None, + chat_clock_at: str | None = None, + source: str = "direct", + significance: int = 1, +) -> dict[str, tuple[int, int | None]]: + """Write per-POV ``memory_written`` events for a meanwhile turn (T64). + + A meanwhile scene runs entirely between host + guest, with "you" + absent. Both bots are present witnesses, so each one gets a row with + witness flags ``[you=0, host=1, guest=1]`` — different from the + normal-turn ``record_turn_memory_for_present`` shape, which assumes + the user is always a witness (``witness_you=1``). + + The ``guest_bot_id`` is required (a meanwhile scene by definition + has both bots) — callers passing ``None`` is a programming error. + + Returns ``{bot_id: (event_id, memory_id)}`` mirroring + :func:`record_turn_memory_for_present` so downstream queues + (significance scoring) can pull memory ids without re-querying. + """ + result: dict[str, tuple[int, int | None]] = {} + result[host_bot_id] = _write_one_memory( + conn, + owner_id=host_bot_id, + chat_id=chat_id, + narrative_text=narrative_text, + witness_you=0, + witness_host=1, + witness_guest=1, + scene_id=scene_id, + chat_clock_at=chat_clock_at, + source=source, + significance=significance, + ) + result[guest_bot_id] = _write_one_memory( + conn, + owner_id=guest_bot_id, + chat_id=chat_id, + narrative_text=narrative_text, + witness_you=0, + witness_host=1, + witness_guest=1, + scene_id=scene_id, + chat_clock_at=chat_clock_at, + source=source, + significance=significance, + ) + return result diff --git a/chat/services/prompt.py b/chat/services/prompt.py index 6f836dc..eeb5bf2 100644 --- a/chat/services/prompt.py +++ b/chat/services/prompt.py @@ -393,6 +393,7 @@ def assemble_narrative_prompt( budget_hard: int = 8000, encoding_name: str = "cl100k_base", guest_id: str | None = None, + present_set_kind: str = "you_host", ) -> list[Message]: """Assemble the narrative prompt for ``speaker_bot_id`` to respond. @@ -431,6 +432,14 @@ def assemble_narrative_prompt( you = get_you(conn) addressee_id, addressee_name = _resolve_addressee(conn, addressee, you) + # T64: meanwhile-mode marker. When present_set_kind == "host_guest" + # the user ("you") is NOT a witness in the scene — bots speak only to + # each other. The local flag below is consumed by the activity-block + # builder (skip the "you" bullet entirely) and the other-edges filter + # (drop any speaker -> "you" rendering). Default "you_host" preserves + # the Phase 1/2/3 behavior for normal turns. + _exclude_you = present_set_kind == "host_guest" + # ---- Build all components as text strings ------------------------------ speaker_identity = _build_speaker_identity(bot) @@ -453,10 +462,11 @@ def assemble_narrative_prompt( # header that Phase 2 T43 introduced (read by some LLMs as a # duplicate-section bug). you_activity: dict | None = None - you_act = get_activity(conn, "you") - if you_act is not None: - you_activity = dict(you_act) - you_activity["_display_name"] = (you or {}).get("name") or "you" + if not _exclude_you: + you_act = get_activity(conn, "you") + if you_act is not None: + you_activity = dict(you_act) + you_activity["_display_name"] = (you or {}).get("name") or "you" speaker_activity: dict | None = None bot_act = get_activity(conn, speaker_bot_id) @@ -530,9 +540,15 @@ def assemble_narrative_prompt( container = get_container(conn, scene["container_id"]) scene_block = _build_scene_block(chat, container, scene) - # Other edges: speaker → non-addressee. + # Other edges: speaker → non-addressee. In meanwhile mode (host_guest) + # the speaker -> "you" edge is filtered out as well — "you" isn't + # part of the present set, so surfacing the speaker's relationship + # to the user inside a private bot-to-bot beat would leak context + # the bots aren't supposed to be drawing on right now. all_outgoing = list_edges_for(conn, speaker_bot_id) other_edges_raw = [e for e in all_outgoing if e.get("target_id") != addressee_id] + if _exclude_you: + other_edges_raw = [e for e in other_edges_raw if e.get("target_id") != "you"] for e in other_edges_raw: tid = e.get("target_id") if tid == "you": diff --git a/chat/web/meanwhile.py b/chat/web/meanwhile.py new file mode 100644 index 0000000..1b04a73 --- /dev/null +++ b/chat/web/meanwhile.py @@ -0,0 +1,398 @@ +"""Meanwhile-mode turn controller (T64). + +A meanwhile scene is a private 2-bot scene running alongside an active +you-scene (its parent). The user manually advances it by POSTing to the +existing ``/chats//turns`` endpoint; the route detects an active +meanwhile scene at the start of ``post_turn`` and dispatches here. + +Unlike the normal turn flow, "you" is NOT a witness to the scene. The +controller mirrors ``post_turn`` shape but with: + +- Speaker alternation derived from the latest meanwhile ``assistant_turn`` + scoped to this scene_id (host first, then alternating). +- Prompt assembly with ``present_set_kind="host_guest"`` so the prompt + builder drops the "you" activity bullet and any speaker -> "you" edge. +- Memory writes via ``record_meanwhile_memory`` — both bots get rows + with witness flags ``[you=0, host=1, guest=1]``. +- State updates over exactly 2 directed pairs (host <-> guest); no + you-related pairs fire. +- The ``assistant_turn`` payload carries ``meanwhile_scene_id`` and + ``present_set_kind="host_guest"`` so downstream filters (alternation + lookup, drawer rendering, scene-close detection) can scope to the + meanwhile slice without conflating it with the parent you-scene's + history. + +Scene-close detection for meanwhile scenes is not auto-fired here — +T65 covers the close + digest pipeline. The controller's job ends +after the post-turn classifier passes land. +""" + +from __future__ import annotations + +import asyncio +import json + +from chat.config import Settings +from chat.eventlog.log import append_and_apply, append_event +from chat.llm.client import LLMClient +from chat.services.memory_write import record_meanwhile_memory +from chat.services.multi_state_update import compute_state_updates_for_present +from chat.services.prompt import assemble_narrative_prompt +from chat.services.turn_parse import parse_turn +from chat.state.edges import get_edge +from chat.state.entities import get_bot +from chat.state.meanwhile import list_meanwhile_scenes +from chat.state.world import get_chat +from chat.web.pubsub import publish +from chat.web.render import render_turn_html as _render_turn_html + + +def _strip_ooc_for_prompt(parsed) -> str: + """Mirror of the helper in turns.py — concatenate non-OOC segments.""" + keep = [s.text for s in parsed.segments if s.kind != "ooc"] + return " ".join(keep).strip() + + +def _read_recent_meanwhile_dialogue( + conn, chat_id: str, scene_id: int, limit: int = 50 +) -> list[dict]: + """Return the meanwhile scene's prior turns shaped as + ``{"speaker": , "text": }``. + + Pulls ``user_turn`` rows for the chat (the user-side prose driving + this meanwhile scene rides through the same chat) plus only those + ``assistant_turn`` rows whose ``meanwhile_scene_id`` matches the + given scene id. Other meanwhile scenes on the same chat — and the + parent you-scene's assistant_turns — are excluded so the prompt + context stays scoped to the private beat. + + Filters chat_id (and meanwhile_scene_id for assistant_turn) via + ``json_extract`` in SQL so SQLite stops at the first ``limit`` rows + that already match — avoids an unbounded scan as ``event_log`` + grows. The user-side rows match on chat_id only since they aren't + tagged with a scene id (they ride the chat-wide log). + """ + cur = conn.execute( + "SELECT id, kind, payload_json FROM event_log " + "WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') " + " AND superseded_by IS NULL AND hidden = 0 " + " AND json_extract(payload_json, '$.chat_id') = ? " + " AND (" + " kind IN ('user_turn', 'user_turn_edit') " + " OR json_extract(payload_json, '$.meanwhile_scene_id') = ?" + " ) " + "ORDER BY id DESC LIMIT ?", + (chat_id, scene_id, limit), + ) + rows = cur.fetchall() + rows.reverse() + out: list[dict] = [] + for _row_id, kind, payload_json in rows: + p = json.loads(payload_json) + if kind in ("user_turn", "user_turn_edit"): + out.append({"speaker": "you", "text": p.get("prose", "")}) + else: + out.append( + { + "speaker": p.get("speaker_id", "bot"), + "text": p.get("text", ""), + } + ) + return out + + +def _last_meanwhile_speaker(conn, chat_id: str, scene_id: int) -> str | None: + """Return the speaker_id of the latest assistant_turn linked to + ``scene_id`` for ``chat_id``, or ``None`` if no prior turn exists. + + Used to alternate the speaker across consecutive meanwhile turns — + the OTHER bot speaks next. Pushes both filters into SQL via + ``json_extract`` and bounds with ``LIMIT 1`` so SQLite stops at the + first match instead of scanning the whole assistant_turn slice. + """ + row = conn.execute( + "SELECT json_extract(payload_json, '$.speaker_id') AS speaker " + "FROM event_log " + "WHERE kind = 'assistant_turn' " + " AND superseded_by IS NULL AND hidden = 0 " + " AND json_extract(payload_json, '$.chat_id') = ? " + " AND json_extract(payload_json, '$.meanwhile_scene_id') = ? " + "ORDER BY id DESC " + "LIMIT 1", + (chat_id, scene_id), + ).fetchone() + return row[0] if row else None + + +async def process_meanwhile_turn( + conn, + client: LLMClient, + settings: Settings, + *, + chat_id: str, + prose: str, +) -> dict: + """Run one meanwhile turn end-to-end. + + Returns a small dict shape ``{"assistant_text": ..., "speaker_id": + ..., "scene_id": ..., "user_turn_id": ..., "assistant_event_id": + ...}`` so callers can introspect the produced beat (HTTP route maps + to a ``204``; future SSE rebroadcast may use the dict directly). + + Raises ``ValueError`` when there is no active meanwhile scene on + ``chat_id`` — the caller (turns.py) only dispatches here after a + positive ``list_meanwhile_scenes`` lookup, but the defensive raise + keeps the controller usable in isolation. + """ + chat = get_chat(conn, chat_id) + if chat is None: + raise ValueError(f"chat not found: {chat_id}") + + scenes = list_meanwhile_scenes(conn, chat_id, status="active") + if not scenes: + raise ValueError(f"no active meanwhile scene on chat: {chat_id}") + scene = scenes[0] + scene_id = scene["id"] + + host_bot_id = chat["host_bot_id"] + guest_bot_id = chat.get("guest_bot_id") + if guest_bot_id is None: + # A meanwhile scene without a guest is a schema violation — + # the projector requires both ids on meanwhile_scene_started. + raise ValueError( + f"meanwhile scene {scene_id} on chat {chat_id} lacks a guest" + ) + + host_bot = get_bot(conn, host_bot_id) + guest_bot = get_bot(conn, guest_bot_id) + if host_bot is None or guest_bot is None: + raise ValueError( + f"meanwhile bots missing: host={host_bot_id} guest={guest_bot_id}" + ) + + # 1. Parse the user prose with the classifier — same shape as the + # normal turn flow so OOC-stripping, segment-typing, etc. all work. + parsed = await parse_turn( + client, model=settings.classifier_model, prose=prose + ) + prompt_prose = _strip_ooc_for_prompt(parsed) + + # 2. Append user_turn — kept on the chat-wide log so the user can + # see their own prose in the timeline. Tagged with the meanwhile + # scene_id so future renderers can group it with the right scene. + user_turn_event_id = append_event( + conn, + kind="user_turn", + payload={ + "chat_id": chat_id, + "prose": prose, + "segments": [s.model_dump() for s in parsed.segments], + "meanwhile_scene_id": scene_id, + }, + ) + + # 3. Alternate the speaker. First turn -> host speaks; each + # subsequent turn -> the OTHER bot from the previous beat. Lookup + # is scoped by ``meanwhile_scene_id`` so unrelated assistant_turns + # on the same chat don't perturb the alternation. + last_speaker = _last_meanwhile_speaker(conn, chat_id, scene_id) + if last_speaker is None or last_speaker == guest_bot_id: + speaker_bot = host_bot + addressee_bot = guest_bot + else: + speaker_bot = guest_bot + addressee_bot = host_bot + + # 4. Placeholder marker so SSE observers see "in flight". No + # projector handler is registered for this kind — it's transcript- + # only, same as the normal turn flow. + append_event( + conn, + kind="assistant_turn_started", + payload={ + "chat_id": chat_id, + "speaker_id": speaker_bot["id"], + "user_turn_id": user_turn_event_id, + "meanwhile_scene_id": scene_id, + }, + ) + + # 5. Build the narrative prompt. ``present_set_kind="host_guest"`` + # tells the assembler to drop the "you" activity bullet and any + # speaker -> "you" edge — both irrelevant inside a private beat. + # Addressee is the OTHER bot, not "you". + recent_dialogue = _read_recent_meanwhile_dialogue(conn, chat_id, scene_id) + if recent_dialogue and recent_dialogue[-1].get("speaker") == "you": + recent_dialogue = recent_dialogue[:-1] + messages = assemble_narrative_prompt( + conn, + chat_id=chat_id, + speaker_bot_id=speaker_bot["id"], + addressee=addressee_bot["id"], + user_turn_prose=prompt_prose if prompt_prose else None, + recent_dialogue=recent_dialogue, + budget_soft=settings.narrative_budget_soft, + budget_hard=settings.narrative_budget_hard, + guest_id=guest_bot_id, + present_set_kind="host_guest", + ) + + # 6. Stream + accumulate. Same SSE pattern as the normal flow — + # tokens publish under the speaker's id so the UI can label the + # right bubble. Register the streaming task in the chat-keyed + # in-flight registry so POST /chats//turns/cancel can call + # ``.cancel()`` on it; without this, the Stop button is a no-op for + # meanwhile beats. We import the underscore name from turns.py + # deliberately — it's the same single-process registry the cancel + # route reads, and exposing it via a public alias would require + # touching every existing call site for no behavioural gain. + from chat.web.turns import _in_flight_tasks # noqa: PLC0415 + + accumulated: list[str] = [] + truncated = False + cancelled = False + + async def _stream() -> None: + async for chunk in client.stream( + messages, + model=settings.narrative_model, + max_tokens=settings.narrative_max_tokens, + temperature=settings.narrative_temperature, + ): + accumulated.append(chunk) + await publish( + chat_id, + { + "event": "token", + "text": chunk, + "speaker_id": speaker_bot["id"], + }, + ) + + stream_task = asyncio.create_task(_stream()) + _in_flight_tasks[chat_id] = stream_task + try: + await stream_task + except asyncio.CancelledError: + truncated = True + cancelled = True + except Exception: + truncated = True + finally: + # Always unregister so a subsequent turn can register a fresh + # task. Mirrors the cleanup in turns.py::post_turn. + _in_flight_tasks.pop(chat_id, None) + + text = "".join(accumulated) + + # 7. Append assistant_turn — tagged with meanwhile_scene_id so the + # next turn's alternation lookup can find it, and present_set_kind + # so downstream renderers / digesters can filter scope. + assistant_event_id = append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": chat_id, + "speaker_id": speaker_bot["id"], + "text": text, + "truncated": truncated, + "user_turn_id": user_turn_event_id, + "meanwhile_scene_id": scene_id, + "present_set_kind": "host_guest", + }, + ) + + # 8. Per-turn memory writes — both bots get a row with witness flags + # [you=0, host=1, guest=1]. Skipped on cancellation so we don't + # record memory for a partial beat the user never read. + if not cancelled and text.strip(): + record_meanwhile_memory( + conn, + chat_id=chat_id, + host_bot_id=host_bot_id, + guest_bot_id=guest_bot_id, + narrative_text=text, + scene_id=scene_id, + chat_clock_at=chat.get("time"), + ) + + # 9. Post-turn state-update — exactly 2 directed pairs over the + # bot pair. No you-related pairs fire (you isn't present). + present_ids = [host_bot_id, guest_bot_id] + present_names = { + host_bot_id: host_bot["name"], + guest_bot_id: guest_bot["name"], + } + personas = { + host_bot_id: host_bot.get("persona") or "", + guest_bot_id: guest_bot.get("persona") or "", + } + prior_edges: dict[tuple[str, str], dict] = {} + for src in present_ids: + for tgt in present_ids: + if src == tgt: + continue + edge = get_edge(conn, src, tgt) or { + "affinity": 50, + "trust": 50, + "summary": "", + } + prior_edges[(src, tgt)] = edge + + state_updates = await compute_state_updates_for_present( + client, + classifier_model=settings.classifier_model, + present_ids=present_ids, + present_names=present_names, + personas=personas, + prior_edges=prior_edges, + recent_dialogue=recent_dialogue, + timeout_s=settings.classifier_timeout_s, + ) + last_at = chat.get("time") + for src_id, tgt_id, update in state_updates: + append_and_apply( + conn, + kind="edge_update", + payload={ + "source_id": src_id, + "target_id": tgt_id, + "chat_id": chat_id, + "affinity_delta": update.affinity_delta, + "trust_delta": update.trust_delta, + "knowledge_facts": update.knowledge_facts, + "last_interaction_at": last_at, + "last_interaction_chat_id": chat_id, + }, + ) + + # 10. SSE broadcast for the timeline UI — completion event + an HTML + # fragment for the HTMX SSE swap. Same pattern as the normal turn + # flow so the rendered transcript shows the meanwhile beat inline. + await publish( + chat_id, + { + "event": "assistant_turn_complete", + "speaker_id": speaker_bot["id"], + "text": text, + "truncated": truncated, + }, + ) + turn_html = _render_turn_html(speaker_bot["name"], text, role="bot") + await publish(chat_id, {"event": "turn_html", "data": turn_html}) + + if cancelled: + # Re-raise after the partial-turn has been recorded so callers + # see the cancel propagate (mirrors normal turn flow). + raise asyncio.CancelledError + + return { + "assistant_text": text, + "speaker_id": speaker_bot["id"], + "scene_id": scene_id, + "user_turn_id": user_turn_event_id, + "assistant_event_id": assistant_event_id, + } + + +__all__ = ["process_meanwhile_turn"] diff --git a/chat/web/turns.py b/chat/web/turns.py index 5d2ff94..3fc51d4 100644 --- a/chat/web/turns.py +++ b/chat/web/turns.py @@ -72,9 +72,11 @@ from chat.services.turn_parse import ParsedTurn, parse_turn from chat.state.edges import get_edge from chat.state.entities import get_bot, get_you from chat.state.events import list_active_events +from chat.state.meanwhile import list_meanwhile_scenes from chat.state.world import active_scene, get_chat, get_container from chat.web.bots import get_conn from chat.web.kickoff import get_llm_client +from chat.web.meanwhile import process_meanwhile_turn from chat.web.pubsub import publish from chat.web.render import render_turn_html as _render_turn_html from chat.web.skip import _parse_iso_time, process_elision_skip @@ -251,6 +253,29 @@ async def post_turn( settings = request.app.state.settings + # 0. Meanwhile-mode short-circuit (T64). When an active meanwhile + # scene is running on this chat, the turn flow is entirely between + # the two bots — "you" is absent. The meanwhile controller mirrors + # the post_turn shape but with no-you semantics: present_set_kind + # ``host_guest`` in the prompt assembler, ``record_meanwhile_memory`` + # for witness flags, only 2 directed pairs in the state update, and + # the assistant_turn payload tagged with ``meanwhile_scene_id`` so + # alternation lookups can scope to this scene specifically. The + # T62 skip-intent dispatch and the regular narrative path below + # are skipped — a meanwhile beat is its own self-contained flow. + if list_meanwhile_scenes(conn, chat_id, status="active"): + try: + await process_meanwhile_turn( + conn, + client, + settings, + chat_id=chat_id, + prose=prose, + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + return Response(status_code=204) + # 1. Parse turn (classifier). parsed = await parse_turn( client, model=settings.classifier_model, prose=prose diff --git a/tests/test_meanwhile_turn_flow.py b/tests/test_meanwhile_turn_flow.py new file mode 100644 index 0000000..a2e2783 --- /dev/null +++ b/tests/test_meanwhile_turn_flow.py @@ -0,0 +1,560 @@ +"""Meanwhile-mode turn flow (T64). + +A meanwhile scene runs entirely between two bots — host + guest — with +"you" absent. The user manually advances the scene by POSTing prose to +the existing ``/chats//turns`` endpoint; the route detects the active +meanwhile scene at the start of ``post_turn`` and dispatches to the +``process_meanwhile_turn`` controller in ``chat/web/meanwhile.py``. + +Coverage: + +1. Memory writes for a meanwhile turn carry witness ``[you=0, host=1, + guest=1]`` for both the host's and the guest's per-POV memory rows. +2. State updates after a meanwhile turn run for exactly 2 directed pairs + (host -> guest, guest -> host) — no you-related pairs fire. +3. Speakers alternate across consecutive meanwhile turns: the host + speaks first (no prior meanwhile assistant_turn), the guest speaks + second (the prior turn's speaker was the host, so this turn's + speaker is the OTHER bot). +4. Scene-close on a meanwhile scene writes per-POV summaries for host + + guest only — no "you" POV row is written, mirroring the no-you + present_set of the meanwhile scene. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest +from fastapi.testclient import TestClient + +from chat.app import app +from chat.db.connection import open_db +from chat.eventlog.log import append_event +from chat.eventlog.projector import project +from chat.llm.mock import MockLLMClient +import chat.state.meanwhile # noqa: F401 (registers handlers) + + +def _bot_payload(bot_id: str, name: str) -> dict: + return { + "id": bot_id, + "name": name, + "persona": f"persona for {name}", + "voice_samples": [], + "traits": [], + "backstory": "", + "initial_relationship_to_you": "", + "kickoff_prose": "...", + } + + +def _seed_meanwhile_chat(db_path: Path) -> None: + """Seed two bots, you, a chat with both wired in, an open parent + you-scene, AND an active meanwhile child scene with bot_a + bot_b. + + Edges are seeded for both directed pairs between bot_a and bot_b at + schema-default 50/50 so post-turn state-update writes land cleanly. + Activities for both bots are recorded so the prompt assembler has + something to render. + """ + with open_db(db_path) as conn: + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_b", "BotB")) + append_event( + conn, + kind="you_authored", + payload={"name": "Me", "pronouns": "they/them", "persona": ""}, + ) + append_event( + conn, + kind="chat_created", + payload={ + "id": "chat_bot_a", + "host_bot_id": "bot_a", + "guest_bot_id": "bot_b", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1", + "weather": "", + }, + ) + append_event( + conn, + kind="container_created", + payload={ + "chat_id": "chat_bot_a", + "name": "office", + "type": "workplace", + "properties": {}, + }, + ) + # Parent (you-scene) opens first. + append_event( + conn, + kind="scene_opened", + payload={ + "chat_id": "chat_bot_a", + "container_id": 1, + "started_at": "2026-04-26T20:00:00+00:00", + "participants": ["you", "bot_a", "bot_b"], + }, + ) + # Meanwhile child scene — bot_a + bot_b only, parent linked. + append_event( + conn, + kind="meanwhile_scene_started", + payload={ + "scene_id": 2, + "chat_id": "chat_bot_a", + "parent_scene_id": 1, + "host_bot_id": "bot_a", + "guest_bot_id": "bot_b", + "started_at": "2026-04-26T20:05:00+00:00", + }, + ) + # Seed both directed edges between the bots so state-update + # writes land on initialized rows. + for src, tgt in [("bot_a", "bot_b"), ("bot_b", "bot_a")]: + append_event( + conn, + kind="edge_update", + payload={ + "source_id": src, + "target_id": tgt, + "chat_id": "chat_bot_a", + "knowledge_facts": [], + }, + ) + for entity_id, verb in [("bot_a", "listening"), ("bot_b", "talking")]: + append_event( + conn, + kind="activity_change", + payload={ + "entity_id": entity_id, + "posture": "sitting", + "action": { + "verb": verb, + "interruptible": True, + "required_attention": "low", + "expected_duration": "ongoing", + }, + "attention": "", + "holding": [], + "status": {}, + }, + ) + project(conn) + + +def _override_llm(canned: list[str]) -> MockLLMClient: + from chat.web.kickoff import get_llm_client + + mock = MockLLMClient(canned=list(canned)) + app.dependency_overrides[get_llm_client] = lambda: mock + return mock + + +def _zero_state() -> str: + return json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + + +@pytest.fixture +def app_state_setup(tmp_path, monkeypatch): + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + db = tmp_path / "test.db" + monkeypatch.setenv("CHAT_DB_PATH", str(db)) + with TestClient(app) as c: + app.state.background_worker.enabled = False + yield c + app.dependency_overrides.clear() + + +def test_meanwhile_turn_writes_memories_with_witness_0_1_1( + app_state_setup, tmp_path +): + """A meanwhile turn writes one ``memory_written`` event per bot — host + and guest — with witness flags ``[you=0, host=1, guest=1]``. "You" is + not present in the scene, so the witness_you flag must be 0 for both + rows. + + Canned queue (4 calls): + 1. parse_turn (user prose classification) + 2. narrative stream (host speaks first; no prior meanwhile turn) + 3. state-update for bot_a -> bot_b + 4. state-update for bot_b -> bot_a + """ + _seed_meanwhile_chat(tmp_path / "test.db") + canned_parse = json.dumps( + {"segments": [{"kind": "narration", "text": "they exchange a glance"}]} + ) + canned = [ + canned_parse, + "BotA leans in. *quietly* Tell me what you saw.", + _zero_state(), + _zero_state(), + ] + mock = _override_llm(canned) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "they exchange a glance"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + with open_db(tmp_path / "test.db") as conn: + rows = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'memory_written' " + "ORDER BY id" + ).fetchall() + payloads = [json.loads(r[0]) for r in rows] + + assert len(payloads) == 2 + owners = sorted(p["owner_id"] for p in payloads) + assert owners == ["bot_a", "bot_b"] + for p in payloads: + assert p["witness_you"] == 0, p + assert p["witness_host"] == 1, p + assert p["witness_guest"] == 1, p + + +def test_meanwhile_turn_emits_2_edge_updates_only(app_state_setup, tmp_path): + """A meanwhile turn runs state-update for exactly 2 directed pairs: + host -> guest and guest -> host. No you-related pairs fire. + """ + _seed_meanwhile_chat(tmp_path / "test.db") + canned_parse = json.dumps( + {"segments": [{"kind": "narration", "text": "they whisper"}]} + ) + canned = [ + canned_parse, + "BotA whispers. *softly* I noticed something today.", + _zero_state(), + _zero_state(), + ] + mock = _override_llm(canned) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", data={"prose": "they whisper"} + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + with open_db(tmp_path / "test.db") as conn: + # Edge updates landed AFTER the assistant_turn (i.e. excluding + # the seed updates done before the request). + max_at = conn.execute( + "SELECT MAX(id) FROM event_log WHERE kind = 'assistant_turn'" + ).fetchone()[0] + rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'edge_update' AND id > ? ORDER BY id", + (max_at,), + ).fetchall() + payloads = [json.loads(r[0]) for r in rows] + + # Exactly 2 post-turn edge_update events. + assert len(payloads) == 2 + pairs = sorted((p["source_id"], p["target_id"]) for p in payloads) + assert pairs == [("bot_a", "bot_b"), ("bot_b", "bot_a")] + # And NO you-related pair leaked in. + for p in payloads: + assert p["source_id"] != "you", p + assert p["target_id"] != "you", p + + +def test_meanwhile_turn_alternates_speaker(app_state_setup, tmp_path): + """Successive meanwhile turns alternate which bot speaks. + + The first turn has no prior meanwhile ``assistant_turn`` linked to + this scene, so the host speaks. The second turn finds the latest + such ``assistant_turn``'s speaker (the host) and picks the OTHER + bot, so the guest speaks. Each ``assistant_turn`` payload carries + ``meanwhile_scene_id`` so the alternation lookup is unambiguous. + """ + _seed_meanwhile_chat(tmp_path / "test.db") + canned_parse_1 = json.dumps( + {"segments": [{"kind": "narration", "text": "they pause"}]} + ) + canned_1 = [ + canned_parse_1, + "BotA speaks first. *quietly*", + _zero_state(), + _zero_state(), + ] + mock = _override_llm(canned_1) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", data={"prose": "they pause"} + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + canned_parse_2 = json.dumps( + {"segments": [{"kind": "narration", "text": "and again"}]} + ) + canned_2 = [ + canned_parse_2, + "BotB replies. *thoughtfully*", + _zero_state(), + _zero_state(), + ] + mock = _override_llm(canned_2) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", data={"prose": "and again"} + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + with open_db(tmp_path / "test.db") as conn: + rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'assistant_turn' ORDER BY id" + ).fetchall() + payloads = [json.loads(r[0]) for r in rows] + + assert len(payloads) == 2 + # First turn — host speaks. + assert payloads[0]["speaker_id"] == "bot_a" + # Second turn — guest speaks (alternation). + assert payloads[1]["speaker_id"] == "bot_b" + # Both payloads tag this meanwhile scene id so the alternation + # lookup can scope to it specifically (not any other assistant_turn + # that might exist on the chat). + assert payloads[0]["meanwhile_scene_id"] == 2 + assert payloads[1]["meanwhile_scene_id"] == 2 + # Both also carry the present_set_kind discriminator for downstream + # filters (digest creation, drawer rendering). + assert payloads[0]["present_set_kind"] == "host_guest" + assert payloads[1]["present_set_kind"] == "host_guest" + + +def test_meanwhile_scene_close_writes_per_pov_for_both_bots_only( + app_state_setup, tmp_path +): + """When a meanwhile scene closes, per-POV summary rewrites land for + the host and the guest. No write fires for "you" — there is no + "you" memory store and no "you" POV in the meanwhile present set. + """ + from chat.services.scene_summarize import apply_scene_close_summary + from chat.eventlog.log import append_and_apply + + _seed_meanwhile_chat(tmp_path / "test.db") + + # Run a meanwhile turn first so each bot has a memory row scoped to + # the meanwhile scene_id (=2). The per-POV rewrite targets these + # rows by ``scene_id``. + canned_parse = json.dumps( + {"segments": [{"kind": "narration", "text": "they speak quietly"}]} + ) + canned = [ + canned_parse, + "BotA speaks. *quietly*", + _zero_state(), + _zero_state(), + ] + mock = _override_llm(canned) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "they speak quietly"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + # Close the meanwhile scene and run the close-summary pipeline. + # Two POV summaries (host + guest) — no "you" POV. + pov_payload_host = json.dumps( + { + "summary": "BotA reflects on the quiet moment with BotB.", + "knowledge_facts": [], + "relationship_summary": "", + } + ) + pov_payload_guest = json.dumps( + { + "summary": "BotB notices BotA's reserved manner.", + "knowledge_facts": [], + "relationship_summary": "", + } + ) + close_mock = MockLLMClient(canned=[pov_payload_host, pov_payload_guest]) + + import asyncio as _asyncio + + with open_db(tmp_path / "test.db") as conn: + # asyncio.run() can't nest under TestClient's loop, but the + # close pipeline is awaitable — drive it via a fresh loop here. + _loop = _asyncio.new_event_loop() + # Mark the meanwhile scene closed via the projector handler. + append_and_apply( + conn, + kind="meanwhile_scene_closed", + payload={ + "scene_id": 2, + "closed_at": "2026-04-26T20:30:00+00:00", + }, + ) + + # apply_scene_close_summary takes host_bot_id; here we tell it to + # operate on the meanwhile scene id (2). With no "you" memory + # row to rewrite (witness_you=0 means "you" doesn't have a + # memory for this scene), the call must produce per-POV writes + # ONLY for bot_a and bot_b. + try: + _loop.run_until_complete( + apply_scene_close_summary( + conn, + close_mock, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=2, + host_bot_id="bot_a", + ) + ) + finally: + _loop.close() + + # Per-POV memory rewrites: count manual_edits with target_kind + # ``memory_pov_summary`` whose target_id maps to a memory row + # scoped to scene 2. + edits = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'manual_edit'" + ).fetchall() + pov_edits = [] + for (raw,) in edits: + payload = json.loads(raw) + if payload.get("target_kind") != "memory_pov_summary": + continue + mem_row = conn.execute( + "SELECT owner_id, scene_id FROM memories WHERE id = ?", + (payload["target_id"],), + ).fetchone() + if mem_row is None or mem_row[1] != 2: + continue + pov_edits.append({"owner": mem_row[0], "new": payload["new_value"]}) + + # Verify the actual current pov_summary on each bot's memory row + # for scene 2 reflects the rewrite. + host_pov = conn.execute( + "SELECT pov_summary FROM memories WHERE owner_id = ? AND scene_id = ?", + ("bot_a", 2), + ).fetchone() + guest_pov = conn.execute( + "SELECT pov_summary FROM memories WHERE owner_id = ? AND scene_id = ?", + ("bot_b", 2), + ).fetchone() + # No "you" memory row should exist for the meanwhile scene — + # "you" was never a witness. + you_row = conn.execute( + "SELECT id FROM memories WHERE owner_id = 'you' AND scene_id = ?", + (2,), + ).fetchone() + + # Exactly two memory_pov_summary rewrites — one per bot witness. + assert len(pov_edits) == 2 + owners = sorted(e["owner"] for e in pov_edits) + assert owners == ["bot_a", "bot_b"] + assert host_pov is not None and "BotA reflects" in host_pov[0] + assert guest_pov is not None and "BotB notices" in guest_pov[0] + # No "you" POV row — meanwhile scenes don't surface a you-memory. + assert you_row is None + + +def test_meanwhile_turn_registered_in_in_flight_tasks( + app_state_setup, tmp_path +): + """A meanwhile turn registers its streaming task in the chat-keyed + ``_in_flight_tasks`` registry the cancel route reads from, and clears + the entry after the stream completes. + + Without registration, ``POST /chats//turns/cancel`` would be a + silent no-op for meanwhile beats — the Stop button wouldn't actually + stop them. We pin the behaviour via a streaming mock that snapshots + ``_in_flight_tasks`` at the moment of its first yield (mid-flight), + then assert the entry is removed after the response returns. + """ + from typing import AsyncIterator, Sequence + + from chat.llm.client import Message + from chat.web.turns import _in_flight_tasks + + _seed_meanwhile_chat(tmp_path / "test.db") + + # Snapshot of (chat_id-present?, registered task object) captured + # at the first stream yield. The closure runs inside the streaming + # coroutine, so when it executes the task is alive and registered. + in_flight_snapshot: dict = {} + + class _SnapshotMock(MockLLMClient): + async def stream( + self, messages: Sequence[Message], *, model: str, **params + ) -> AsyncIterator[str]: + text = self._canned.pop(0) + for i, ch in enumerate(text): + if i == 0: + # Snapshot at first yield — the post_turn coroutine + # is awaiting our generator and the streaming Task + # is registered in _in_flight_tasks[chat_id]. + in_flight_snapshot["present"] = ( + "chat_bot_a" in _in_flight_tasks + ) + in_flight_snapshot["task"] = _in_flight_tasks.get( + "chat_bot_a" + ) + yield ch + + canned_parse = json.dumps( + {"segments": [{"kind": "narration", "text": "they exchange a glance"}]} + ) + mock = _SnapshotMock( + canned=[ + canned_parse, + "BotA leans in. *quietly*", + _zero_state(), + _zero_state(), + ] + ) + from chat.web.kickoff import get_llm_client + + app.dependency_overrides[get_llm_client] = lambda: mock + try: + # Pre-condition: registry is empty for this chat. + assert "chat_bot_a" not in _in_flight_tasks + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "they exchange a glance"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + + # Mid-flight: the streaming task was present in the registry, and + # the captured value was an asyncio.Task (not None / not some other + # placeholder). + import asyncio + + assert in_flight_snapshot.get("present") is True, ( + "_in_flight_tasks was empty at first yield — meanwhile stream " + "isn't registering its task" + ) + assert isinstance(in_flight_snapshot.get("task"), asyncio.Task) + # Post-flight: the entry has been cleaned up so the next turn (or + # the cancel route) doesn't see a stale task. + assert "chat_bot_a" not in _in_flight_tasks