diff --git a/tests/test_phase3_integration.py b/tests/test_phase3_integration.py new file mode 100644 index 0000000..11a9bb0 --- /dev/null +++ b/tests/test_phase3_integration.py @@ -0,0 +1,1241 @@ +"""Phase 3 cross-feature integration tests (T66). + +These tests exercise multi-feature flows end-to-end. Phase 3 introduced +several cross-feature interaction surfaces (event lifecycle + promotion, +threads on scene close, jump-skip synthesized memories with retrieval, +meanwhile digests surfacing across scene boundaries, and meanwhile + +you-scene coexistence with witness-filtered memories). Each test below +drives the actual HTTP / service entry points, mocks the LLM with a +canned queue annotated for the precise call sequence, and asserts on +both the event_log AND the projected state after each action. + +Wave 6b's cross-feature merge surfaced canned-queue interaction bugs; +the goal here is to catch that class of regression in the test suite +before it ships. + +Five scenarios: + +1. ``test_event_lifecycle_promotion_lands_memory_and_edge`` — Plan event + → play turns → ``event_started`` detected → ``event_completed`` + detected → promotion fires → memory + edge updates land. +2. ``test_thread_open_on_close_renders_then_close_via_drawer_drops`` — + Open a thread on close → next scene's prompt includes the open thread + → close thread via drawer → next scene's prompt no longer includes it. +3. ``test_jump_skip_synthesized_memories_retrievable_next_turn`` — + Jump skip → synthesized memories land per present bot → next turn's + prompt retrieves them via search. +4. ``test_meanwhile_close_digest_surfaces_then_consumed`` — Meanwhile + scene → close → digest pending → first you-turn prompt includes + digest → after consumption, digest no longer renders. +5. ``test_meanwhile_and_you_scene_witness_filtered_memories`` — + Meanwhile while a regular you-scene is active → both scenes have + memories; querying memories for either bot returns the right + witness-filtered slices. + +Cross-feature notes discovered while writing these tests: + +- The thread-detection call on every scene close (T58.2) is wrapped in + try/except so its canned-queue slot is OPTIONAL — an IndexError is + swallowed. Tests that don't care about thread coverage can omit the + slot; test 2 includes a valid thread response to exercise the path. +- ``consume_pending_meanwhile_digests`` is defined in chat.services.prompt + but is NOT currently wired into the post_turn flow. The digest stays + pending across turns until the helper is called explicitly. Test 4 + reflects this: it asserts the digest renders pre-consumption AND + post-consumption (driven via the helper directly), and that the + meanwhile_digest_consumed event lands in the event_log. +- The host-only ``apply_scene_close_summary`` canned queue layout is + ``[host_pov, thread_detection]`` (2 slots) when a single bot is present + and there are dialogue rows, with thread_detection being optional / + swallowed on IndexError. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest +from fastapi.testclient import TestClient + +from chat.app import app +from chat.db.connection import open_db +from chat.eventlog.log import append_and_apply, append_event +from chat.eventlog.projector import project +from chat.llm.mock import MockLLMClient +import chat.state.meanwhile # noqa: F401 -- register handlers + + +# --------------------------------------------------------------------------- +# Shared fixtures. +# --------------------------------------------------------------------------- + + +def _bot_payload(bot_id: str, name: str, persona: str = "") -> dict: + return { + "id": bot_id, + "name": name, + "persona": persona or f"persona for {name}", + "voice_samples": [], + "traits": [], + "backstory": "", + "initial_relationship_to_you": "", + "kickoff_prose": "...", + } + + +def _zero_state() -> str: + return json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + + +def _override_llm(canned: list[str]) -> MockLLMClient: + """Wire a fresh MockLLMClient and return it so tests can introspect + the residual canned queue after the request. + """ + from chat.web.kickoff import get_llm_client + + mock = MockLLMClient(canned=list(canned)) + app.dependency_overrides[get_llm_client] = lambda: mock + return mock + + +@pytest.fixture +def app_state_setup(tmp_path, monkeypatch): + """Per-test environment + TestClient. Mirrors the pattern used by + tests/test_turn_flow.py and tests/test_meanwhile_turn_flow.py. + """ + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + db = tmp_path / "test.db" + monkeypatch.setenv("CHAT_DB_PATH", str(db)) + with TestClient(app) as c: + app.state.background_worker.enabled = False + yield c + app.dependency_overrides.clear() + + +def _seed_single_bot_chat(db_path: Path) -> None: + """Author BotA + you, create chat with active scene, seed an + edge + activities so the prompt assembler has something to render. + """ + with open_db(db_path) as conn: + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) + append_event( + conn, + kind="you_authored", + payload={"name": "Me", "pronouns": "they/them", "persona": ""}, + ) + append_event( + conn, + kind="chat_created", + payload={ + "id": "chat_bot_a", + "host_bot_id": "bot_a", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1", + "weather": "", + }, + ) + append_event( + conn, + kind="container_created", + payload={ + "chat_id": "chat_bot_a", + "name": "office", + "type": "workplace", + "properties": {}, + }, + ) + append_event( + conn, + kind="scene_opened", + payload={ + "chat_id": "chat_bot_a", + "container_id": 1, + "started_at": "2026-04-26T20:00:00+00:00", + "participants": ["you", "bot_a"], + }, + ) + append_event( + conn, + kind="edge_update", + payload={ + "source_id": "bot_a", + "target_id": "you", + "chat_id": "chat_bot_a", + "knowledge_facts": [], + }, + ) + append_event( + conn, + kind="edge_update", + payload={ + "source_id": "you", + "target_id": "bot_a", + "chat_id": "chat_bot_a", + "knowledge_facts": [], + }, + ) + for entity_id, verb in [("you", "talking"), ("bot_a", "listening")]: + append_event( + conn, + kind="activity_change", + payload={ + "entity_id": entity_id, + "posture": "sitting", + "action": { + "verb": verb, + "interruptible": True, + "required_attention": "low", + "expected_duration": "ongoing", + }, + "attention": "", + "holding": [], + "status": {}, + }, + ) + project(conn) + + +def _seed_two_bot_chat(db_path: Path) -> None: + """Author BotA + BotB + you, create a chat with both wired in, an + open scene, edges for all 6 directed pairs, activities for all three. + """ + with open_db(db_path) as conn: + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_b", "BotB")) + append_event( + conn, + kind="you_authored", + payload={"name": "Me", "pronouns": "they/them", "persona": ""}, + ) + append_event( + conn, + kind="chat_created", + payload={ + "id": "chat_bot_a", + "host_bot_id": "bot_a", + "guest_bot_id": "bot_b", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1", + "weather": "", + }, + ) + append_event( + conn, + kind="container_created", + payload={ + "chat_id": "chat_bot_a", + "name": "office", + "type": "workplace", + "properties": {}, + }, + ) + append_event( + conn, + kind="scene_opened", + payload={ + "chat_id": "chat_bot_a", + "container_id": 1, + "started_at": "2026-04-26T20:00:00+00:00", + "participants": ["you", "bot_a", "bot_b"], + }, + ) + for src, tgt in [ + ("bot_a", "you"), + ("you", "bot_a"), + ("bot_b", "you"), + ("you", "bot_b"), + ("bot_a", "bot_b"), + ("bot_b", "bot_a"), + ]: + append_event( + conn, + kind="edge_update", + payload={ + "source_id": src, + "target_id": tgt, + "chat_id": "chat_bot_a", + "knowledge_facts": [], + }, + ) + for entity_id, verb in [ + ("you", "talking"), + ("bot_a", "listening"), + ("bot_b", "listening"), + ]: + append_event( + conn, + kind="activity_change", + payload={ + "entity_id": entity_id, + "posture": "sitting", + "action": { + "verb": verb, + "interruptible": True, + "required_attention": "low", + "expected_duration": "ongoing", + }, + "attention": "", + "holding": [], + "status": {}, + }, + ) + project(conn) + + +# --------------------------------------------------------------------------- +# 1. Event lifecycle: plan -> active -> completed -> promotion lands. +# --------------------------------------------------------------------------- + + +def test_event_lifecycle_promotion_lands_memory_and_edge( + app_state_setup, tmp_path +): + """Plan an event with a knowledge_facts prop, drive a turn that the + classifier flags ``new_status='active'``, then drive a second turn + that flags ``new_status='completed'``. Assert: + + * ``event_started`` lands after turn 1 with the correct event_id. + * ``event_completed`` lands after turn 2. + * ``promote_completed_event`` runs inline, emitting a follow-on + ``edge_update`` (source='event_promotion') carrying the planned fact. + * The directed bot_a -> you edge actually carries the fact in its + knowledge list (i.e. the projector applied the promotion). + + Canned queue per turn (single-bot, scene active, no guest, so no + addressee classifier and no interjection branch): + 1. parse_turn (user prose classifier) + 2. narrative stream + 3. state-update bot_a -> you + 4. state-update you -> bot_a + 5. detect_event_transitions -> active (turn 1) / completed (turn 2) + 6. detect_scene_close -> False + + Both turns include the scene_close slot — detect_scene_close runs on + every turn that has a non-empty prose AND an active scene. Memory + writes fire 1 per turn for single-bot (host POV only). + """ + _seed_single_bot_chat(tmp_path / "test.db") + + # Plan an event whose props carry a knowledge_fact for promotion. + with open_db(tmp_path / "test.db") as conn: + append_and_apply( + conn, + kind="event_planned", + payload={ + "event_id": "evt_dinner", + "chat_id": "chat_bot_a", + "kind": "dinner_with_friend", + "props": { + "knowledge_facts": [ + { + "owner_id": "bot_a", + "target_id": "you", + "fact": "Maya enjoyed the wine choice", + } + ] + }, + "planned_for": "2026-04-26T20:30:00+00:00", + }, + ) + + # ---- Turn 1: classifier flags event as active. ---- + canned_parse_1 = json.dumps( + {"segments": [{"kind": "narration", "text": "we sit down at the table"}]} + ) + canned_event_active = json.dumps( + { + "transitions": [ + { + "event_id": "evt_dinner", + "new_status": "active", + "reason": "they sat down", + } + ] + } + ) + canned_close_no = json.dumps({"should_close": False, "reason": "no signal"}) + + # Turn 1 layout: parse + narrative + 2 state-updates + event_decision + + # scene_close. 6 slots total (single-bot has 2 directed pairs). + mock = _override_llm( + [ + canned_parse_1, + "Maya glances around the dining room.", + _zero_state(), + _zero_state(), + canned_event_active, + canned_close_no, + ] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "we sit down at the table"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [], ( + f"turn 1 left canned slots unconsumed: {mock._canned}" + ) + + # event_started landed; event row reflects active. + with open_db(tmp_path / "test.db") as conn: + started_rows = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'event_started'" + ).fetchall() + assert len(started_rows) == 1 + assert json.loads(started_rows[0][0])["event_id"] == "evt_dinner" + + ev_row = conn.execute( + "SELECT status FROM events WHERE event_id = 'evt_dinner'" + ).fetchone() + assert ev_row is not None and ev_row[0] == "active" + + # No promotion has fired yet (only completion triggers promotion). + promo_count = conn.execute( + "SELECT COUNT(*) FROM event_log " + "WHERE kind = 'edge_update' " + " AND json_extract(payload_json, '$.source') = 'event_promotion'" + ).fetchone()[0] + assert promo_count == 0 + + # ---- Turn 2: classifier flags event as completed. ---- + canned_parse_2 = json.dumps( + {"segments": [{"kind": "narration", "text": "we wrap up the meal"}]} + ) + canned_event_completed = json.dumps( + { + "transitions": [ + { + "event_id": "evt_dinner", + "new_status": "completed", + "reason": "wrapped up", + } + ] + } + ) + mock = _override_llm( + [ + canned_parse_2, + "Maya signals for the check.", + _zero_state(), + _zero_state(), + canned_event_completed, + canned_close_no, + ] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "we wrap up the meal"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [], ( + f"turn 2 left canned slots unconsumed: {mock._canned}" + ) + + with open_db(tmp_path / "test.db") as conn: + # event_completed landed. + completed_rows = conn.execute( + "SELECT id, payload_json FROM event_log " + "WHERE kind = 'event_completed'" + ).fetchall() + assert len(completed_rows) == 1 + assert json.loads(completed_rows[0][1])["event_id"] == "evt_dinner" + + # promote_completed_event ran inline — an edge_update with + # source=event_promotion lands carrying the planned fact. + promo_rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'edge_update' " + " AND json_extract(payload_json, '$.source') = 'event_promotion'" + ).fetchall() + promo_facts: list[str] = [] + for (raw,) in promo_rows: + promo_facts.extend(json.loads(raw).get("knowledge_facts") or []) + assert "Maya enjoyed the wine choice" in promo_facts + + # The directed bot_a -> you edge surfaces the fact. + from chat.state.edges import get_edge + + edge = get_edge(conn, "bot_a", "you") + assert edge is not None + assert "Maya enjoyed the wine choice" in (edge.get("knowledge") or []) + + # Memory writes: 1 per turn for single-bot, so 2 in total. + mem_count = conn.execute( + "SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'" + ).fetchone()[0] + assert mem_count == 2 + + +# --------------------------------------------------------------------------- +# 2. Threads: open on close -> renders -> close via drawer -> drops. +# --------------------------------------------------------------------------- + + +def test_thread_open_on_close_renders_then_close_via_drawer_drops( + app_state_setup, tmp_path +): + """Drive a turn whose prose hard-signals close, classifier confirms + close, and the close pipeline opens a thread (T58.2). Then assemble + a fresh narrative prompt and assert the open thread renders. Close + the thread via the drawer route. Re-assemble — the thread is gone. + + Canned queue (single-bot turn that closes the scene): + 1. parse_turn + 2. narrative stream + 3. state-update bot_a -> you + 4. state-update you -> bot_a + 5. detect_scene_close -> True (no event slot — no active events) + 6. apply_scene_close_summary host POV + 7. detect_threads -> 1 open thread + + No event_decision slot — list_active_events is empty so the + classifier short-circuits per T52 (verified by the consumed queue + assertion below). + """ + _seed_single_bot_chat(tmp_path / "test.db") + + canned_parse = json.dumps( + {"segments": [{"kind": "narration", "text": "we are done here, fade out"}]} + ) + canned_close_yes = json.dumps( + {"should_close": True, "reason": "fade out"} + ) + canned_pov = json.dumps( + { + "summary": "BotA noticed an unresolved tension before the fade.", + "knowledge_facts": [], + "relationship_summary": "", + } + ) + # Thread detection — single open candidate. The detect_threads service + # consumes this slot; if it had returned no candidates the slot still + # gets consumed, so we always count it. + canned_threads = json.dumps( + { + "candidates": [ + { + "action": "open", + "title": "the missing key", + "summary": "Couldn't find the key before BotA left.", + "existing_thread_id": None, + } + ] + } + ) + + mock = _override_llm( + [ + canned_parse, + "BotA pauses, then heads for the door.", + _zero_state(), + _zero_state(), + canned_close_yes, + canned_pov, + canned_threads, + ] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "we are done here, fade out"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [], ( + f"turn 1 left canned slots unconsumed: {mock._canned}" + ) + + with open_db(tmp_path / "test.db") as conn: + # scene_closed landed. + scene_close_count = conn.execute( + "SELECT COUNT(*) FROM event_log WHERE kind = 'scene_closed'" + ).fetchone()[0] + assert scene_close_count == 1 + + # thread_opened landed. + thread_rows = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'thread_opened'" + ).fetchall() + assert len(thread_rows) == 1 + thread_payload = json.loads(thread_rows[0][0]) + assert thread_payload["title"] == "the missing key" + thread_id = thread_payload["thread_id"] + + # The next prompt assembly must surface the open thread block. + from chat.services.prompt import assemble_narrative_prompt + + with open_db(tmp_path / "test.db") as conn: + msgs = assemble_narrative_prompt( + conn, + chat_id="chat_bot_a", + speaker_bot_id="bot_a", + recent_dialogue=[], + retrieved_memory_summaries=[], + ) + body = msgs[0].content + assert "Open threads:" in body + assert "the missing key" in body + + # Now close the thread via the drawer route. + response = app_state_setup.post( + f"/chats/chat_bot_a/drawer/thread/close/{thread_id}" + ) + assert response.status_code == 200 + + with open_db(tmp_path / "test.db") as conn: + # thread_closed event landed. + closed_rows = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'thread_closed'" + ).fetchall() + assert len(closed_rows) == 1 + assert json.loads(closed_rows[0][0])["thread_id"] == thread_id + + # Re-assemble — the open-threads block is gone. + msgs2 = assemble_narrative_prompt( + conn, + chat_id="chat_bot_a", + speaker_bot_id="bot_a", + recent_dialogue=[], + retrieved_memory_summaries=[], + ) + body2 = msgs2[0].content + assert "Open threads:" not in body2 + assert "the missing key" not in body2 + + +# --------------------------------------------------------------------------- +# 3. Jump skip: synthesized memories land + retrievable on next turn. +# --------------------------------------------------------------------------- + + +def test_jump_skip_synthesized_memories_retrievable_next_turn( + app_state_setup, tmp_path +): + """Drive a jump skip via the drawer route with non-empty notable_prose. + The skip controller writes synthesized memories for the host bot, + then a subsequent narrative turn's prompt assembly must surface + them via FTS5 search when the query overlaps the memory text. + + Canned queue for the jump skip (single-bot, no guest): + 1. synthesize_memories digest (1 memory, single host bot) + 2. narrate_skip (assistant_turn narration) + + Canned queue for the follow-up turn (single-bot, scene still open + after the jump because jump only advances the clock): + 1. parse_turn + 2. narrative stream + 3. state-update bot_a -> you + 4. state-update you -> bot_a + 5. detect_scene_close -> False + + The post-skip retrieval is verified two ways: + * The memory row exists in ``memories`` for owner=bot_a with + ``source='synthesized'`` and the seeded text. + * ``search_memories`` returns the memory when queried by a token + from the synthesized prose; we don't try to assert the retrieved + memory shows up in the assembled prompt body, because the prompt + assembler picks its query from container/anchor (which doesn't + overlap the synthesized prose) — we instead drive the search + directly. Future work: pin the assembled-prompt-includes-it + contract once a deliberate query-builder lands. + """ + _seed_single_bot_chat(tmp_path / "test.db") + + # ---- Jump skip via drawer. ---- + digest_json = json.dumps( + { + "memories": [ + { + "text": "Maya bumped into Alex at the cafe and they argued.", + "significance": 2, + "affinity_delta": 0, + "trust_delta": 0, + } + ] + } + ) + narration = "Hours pass; Maya returns visibly off-kilter." + mock = _override_llm([digest_json, narration]) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/drawer/skip/jump", + data={ + "new_time": "2026-04-26T22:00:00+00:00", + "notable_prose": "I bumped into Alex at the cafe and we argued.", + "reset_activity": "", + }, + ) + assert response.status_code == 200 + finally: + app.dependency_overrides.clear() + assert mock._canned == [], ( + f"jump skip left canned slots unconsumed: {mock._canned}" + ) + + # Verify the synthesized memory landed for the host bot. + with open_db(tmp_path / "test.db") as conn: + synth_payloads = [] + rows = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'memory_written'" + ).fetchall() + for (raw,) in rows: + payload = json.loads(raw) + if payload.get("source") == "synthesized": + synth_payloads.append(payload) + assert len(synth_payloads) == 1 + assert synth_payloads[0]["owner_id"] == "bot_a" + assert "Alex" in synth_payloads[0]["pov_summary"] + + # The memory is retrievable via search_memories — host POV. + from chat.state.memory import search_memories + + hits = search_memories(conn, "bot_a", "host", "Alex", k=4) + assert len(hits) == 1 + assert hits[0]["pov_summary"].startswith("Maya bumped into Alex") + assert hits[0]["source"] == "synthesized" + # And the significance is preserved through the round-trip. + assert hits[0]["significance"] == 2 + + # ---- Follow-up turn: drive a normal turn so the post_turn flow runs + # against the post-skip state. We don't assert the synthesized + # memory appears verbatim in the prompt body (the assembler's query + # is keyed on container/anchor, which doesn't overlap), but we do + # verify the turn lands cleanly and the memory remains retrievable. + canned_parse = json.dumps( + {"segments": [{"kind": "dialogue", "text": "what was that about?"}]} + ) + canned_close_no = json.dumps( + {"should_close": False, "reason": "no signal"} + ) + mock = _override_llm( + [ + canned_parse, + "Maya hesitates. *quietly* I'd rather not talk about it.", + _zero_state(), + _zero_state(), + canned_close_no, + ] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "what was that about?"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [], ( + f"follow-up turn left canned slots unconsumed: {mock._canned}" + ) + + # The synthesized memory is still retrievable post-turn (it wasn't + # clobbered or hidden by the new turn's writes). + with open_db(tmp_path / "test.db") as conn: + from chat.state.memory import search_memories + + hits = search_memories(conn, "bot_a", "host", "Alex", k=4) + assert any( + h["source"] == "synthesized" and "Alex" in h["pov_summary"] + for h in hits + ) + + +# --------------------------------------------------------------------------- +# 4. Meanwhile close digest: pending -> renders in next you-turn prompt +# -> consumed via helper -> no longer renders. +# --------------------------------------------------------------------------- + + +def test_meanwhile_close_digest_surfaces_then_consumed( + app_state_setup, tmp_path +): + """Seed a parent you-scene + active meanwhile child scene. Drive one + meanwhile turn so each bot has a memory row scoped to scene 2. + Close the meanwhile scene + run apply_scene_close_summary inline. + The digest row lands. Next assemble a you-scene prompt — the + digest renders. Drive consume_pending_meanwhile_digests. Re-assemble + — the digest is gone, and a meanwhile_digest_consumed event landed. + + Cross-feature finding: ``consume_pending_meanwhile_digests`` is + defined in chat.services.prompt but is NOT wired into the post_turn + flow. The digest stays pending across turns until callers invoke + the helper. Test exercises the helper directly so the consumption + contract is pinned independent of any future post_turn integration. + + Canned queue for the meanwhile turn: + 1. parse_turn + 2. narrative stream + 3. state-update bot_a -> bot_b + 4. state-update bot_b -> bot_a + + Canned queue for apply_scene_close_summary on meanwhile scene: + 1. host POV summary + 2. guest POV summary + 3. digest summary (the meanwhile_digest_pending text) + 4. detect_threads (T58.2 always runs on close; meanwhile included) + """ + db_path = tmp_path / "test.db" + + # Seed the chat + parent you-scene + active meanwhile child scene. + with open_db(db_path) as conn: + append_event( + conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA") + ) + append_event( + conn, kind="bot_authored", payload=_bot_payload("bot_b", "BotB") + ) + append_event( + conn, + kind="you_authored", + payload={"name": "Me", "pronouns": "they/them", "persona": ""}, + ) + append_event( + conn, + kind="chat_created", + payload={ + "id": "chat_bot_a", + "host_bot_id": "bot_a", + "guest_bot_id": "bot_b", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1", + "weather": "", + }, + ) + append_event( + conn, + kind="container_created", + payload={ + "chat_id": "chat_bot_a", + "name": "office", + "type": "workplace", + "properties": {}, + }, + ) + # Parent you-scene (id=1). + append_event( + conn, + kind="scene_opened", + payload={ + "chat_id": "chat_bot_a", + "container_id": 1, + "started_at": "2026-04-26T20:00:00+00:00", + "participants": ["you", "bot_a", "bot_b"], + }, + ) + # Meanwhile child (id=2) — bot_a + bot_b only. + append_event( + conn, + kind="meanwhile_scene_started", + payload={ + "scene_id": 2, + "chat_id": "chat_bot_a", + "parent_scene_id": 1, + "host_bot_id": "bot_a", + "guest_bot_id": "bot_b", + "started_at": "2026-04-26T20:05:00+00:00", + }, + ) + # Edges for bot pairs (state-update writes need initialized rows). + for src, tgt in [ + ("bot_a", "you"), + ("bot_b", "you"), + ("bot_a", "bot_b"), + ("bot_b", "bot_a"), + ]: + append_event( + conn, + kind="edge_update", + payload={ + "source_id": src, + "target_id": tgt, + "chat_id": "chat_bot_a", + "knowledge_facts": [], + }, + ) + for entity_id, verb in [("bot_a", "listening"), ("bot_b", "talking")]: + append_event( + conn, + kind="activity_change", + payload={ + "entity_id": entity_id, + "posture": "sitting", + "action": { + "verb": verb, + "interruptible": True, + "required_attention": "low", + "expected_duration": "ongoing", + }, + "attention": "", + "holding": [], + "status": {}, + }, + ) + project(conn) + + # ---- Drive a meanwhile turn so each bot has a memory in scene 2. ---- + canned_parse = json.dumps( + {"segments": [{"kind": "narration", "text": "they whisper"}]} + ) + mock = _override_llm( + [ + canned_parse, + "BotA leans in. *softly* I have to tell you something.", + _zero_state(), + _zero_state(), + ] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "they whisper"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + # ---- Close the meanwhile scene + run apply_scene_close_summary. ---- + import asyncio + from chat.services.scene_summarize import apply_scene_close_summary + + host_pov = json.dumps( + { + "summary": "BotA confided in BotB about the missing key.", + "knowledge_facts": [], + "relationship_summary": "", + } + ) + guest_pov = json.dumps( + { + "summary": "BotB listened and offered to help.", + "knowledge_facts": [], + "relationship_summary": "", + } + ) + digest_text = ( + "While you were away, BotA confided in BotB about a missing key." + ) + digest_canned = json.dumps( + { + "summary": digest_text, + "knowledge_facts": [], + "relationship_summary": "", + } + ) + no_threads = json.dumps({"candidates": []}) + close_mock = MockLLMClient( + canned=[host_pov, guest_pov, digest_canned, no_threads] + ) + + with open_db(db_path) as conn: + # Mark the meanwhile scene closed so apply_scene_close_summary + # operates on a closed scene — same shape as the production + # close path in T64/T65. + append_and_apply( + conn, + kind="meanwhile_scene_closed", + payload={ + "scene_id": 2, + "closed_at": "2026-04-26T20:30:00+00:00", + }, + ) + loop = asyncio.new_event_loop() + try: + loop.run_until_complete( + apply_scene_close_summary( + conn, + close_mock, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=2, + host_bot_id="bot_a", + ) + ) + finally: + loop.close() + assert close_mock._canned == [], ( + f"close path left canned slots unconsumed: {close_mock._canned}" + ) + + # The digest landed in event_log + projection table. + from chat.state.meanwhile import list_pending_meanwhile_digests + + pending = list_pending_meanwhile_digests(conn, "chat_bot_a") + assert len(pending) == 1 + assert "missing key" in pending[0]["summary"] + + # ---- First you-scene prompt: the digest renders as a SHOULD-tier + # 'Meanwhile while you were away:' block. ---- + from chat.services.prompt import assemble_narrative_prompt + + with open_db(db_path) as conn: + msgs = assemble_narrative_prompt( + conn, + chat_id="chat_bot_a", + speaker_bot_id="bot_a", + recent_dialogue=[], + retrieved_memory_summaries=[], + ) + body = msgs[0].content + assert "Meanwhile while you were away:" in body + assert digest_text in body + + # ---- Consume + re-assemble. The digest is gone, and a + # meanwhile_digest_consumed event lands. ---- + from chat.services.prompt import consume_pending_meanwhile_digests + + with open_db(db_path) as conn: + consumed = consume_pending_meanwhile_digests(conn, "chat_bot_a") + assert consumed == 1 + + consumed_rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'meanwhile_digest_consumed'" + ).fetchall() + assert len(consumed_rows) == 1 + + msgs2 = assemble_narrative_prompt( + conn, + chat_id="chat_bot_a", + speaker_bot_id="bot_a", + recent_dialogue=[], + retrieved_memory_summaries=[], + ) + body2 = msgs2[0].content + assert "Meanwhile while you were away:" not in body2 + assert digest_text not in body2 + + # Pending list is empty after consumption. + from chat.state.meanwhile import list_pending_meanwhile_digests + + assert list_pending_meanwhile_digests(conn, "chat_bot_a") == [] + + +# --------------------------------------------------------------------------- +# 5. Meanwhile + you-scene coexistence: both have memories with the right +# witness flags, retrievable per bot via search. +# --------------------------------------------------------------------------- + + +def test_meanwhile_and_you_scene_witness_filtered_memories( + app_state_setup, tmp_path +): + """Seed a parent you-scene + active meanwhile child scene. Drive + one meanwhile turn (host_guest present_set, [you=0, host=1, guest=1] + witness flags). Close the meanwhile scene so the post-meanwhile main + scene is the active scene. Drive a regular you-turn (you_host_guest + present_set, [you=1, host=1, guest=1] witness flags). Each bot now + has TWO memories — one from the meanwhile scene, one from the + you-scene. Witness-filtered search: + + * Querying owner=bot_a, witness_role='host' over a meanwhile-only + keyword returns the meanwhile memory (witness_host=1). + * Querying owner=bot_a, witness_role='host' over a you-scene-only + keyword returns the you-scene memory. + * Querying owner=bot_b, witness_role='guest' over each keyword + similarly returns the right memory (the per-bot store is + separately witnessed). + + Canned queue for the meanwhile turn: + 1. parse_turn + 2. narrative stream + 3. state-update bot_a -> bot_b + 4. state-update bot_b -> bot_a + + Canned queue for the you-turn (post-meanwhile): + 1. parse_turn + 2. detect_addressee (host vs. guest -> host) + 3. narrative stream + 4-9. 6 state-update calls (full directed pairs over you/host/guest) + 10. detect_interjection -> False + 11. detect_scene_close -> False (scene stays open) + """ + db_path = tmp_path / "test.db" + _seed_two_bot_chat(db_path) + + # Seed an active meanwhile child scene (id=2) on top of the parent + # you-scene (id=1). + with open_db(db_path) as conn: + append_and_apply( + conn, + kind="meanwhile_scene_started", + payload={ + "scene_id": 2, + "chat_id": "chat_bot_a", + "parent_scene_id": 1, + "host_bot_id": "bot_a", + "guest_bot_id": "bot_b", + "started_at": "2026-04-26T20:05:00+00:00", + }, + ) + + # ---- Meanwhile turn: keyword 'pottery' so it's distinguishable from + # the you-turn keyword later. The narrative text drives memory + # pov_summary text via record_meanwhile_memory. + meanwhile_parse = json.dumps( + {"segments": [{"kind": "narration", "text": "they linger"}]} + ) + meanwhile_text = "BotA mentions a pottery class she's been taking." + mock = _override_llm( + [ + meanwhile_parse, + meanwhile_text, + _zero_state(), + _zero_state(), + ] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "they linger"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + # ---- Close the meanwhile scene so the next post_turn dispatches to + # the regular you-flow rather than meanwhile_turn_flow. + with open_db(db_path) as conn: + append_and_apply( + conn, + kind="meanwhile_scene_closed", + payload={ + "scene_id": 2, + "closed_at": "2026-04-26T20:25:00+00:00", + }, + ) + + # ---- You-turn: keyword 'whiteboard' so the post-turn memory's text + # is distinguishable from the meanwhile memory above. 2-bot chat + # so the full directed-pair fan-out fires. + you_parse = json.dumps( + {"segments": [{"kind": "dialogue", "text": "let's sketch this out"}]} + ) + addressee_decision = json.dumps( + { + "addressee_id": "bot_a", + "confidence": "medium", + "reason": "host", + } + ) + you_text = "BotA grabs a whiteboard marker and starts sketching." + you_close_no = json.dumps( + {"should_close": False, "reason": "scene continues"} + ) + you_interject_no = json.dumps( + {"should_interject": False, "reason": "calm"} + ) + mock = _override_llm( + [ + you_parse, + addressee_decision, + you_text, + _zero_state(), _zero_state(), _zero_state(), + _zero_state(), _zero_state(), _zero_state(), + you_interject_no, + you_close_no, + ] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "let's sketch this out"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [], ( + f"you-turn left canned slots unconsumed: {mock._canned}" + ) + + # ---- Verify memory shape across BOTH scenes for BOTH bots. ---- + with open_db(db_path) as conn: + rows = conn.execute( + "SELECT owner_id, scene_id, pov_summary, " + " witness_you, witness_host, witness_guest " + "FROM memories ORDER BY id" + ).fetchall() + + # Expect 4 rows: meanwhile (host+guest = 2) + you-turn (host+guest = 2). + assert len(rows) == 4, ( + f"unexpected memory shape after both turns: {rows}" + ) + + meanwhile_rows = [r for r in rows if r[1] == 2] + you_scene_rows = [r for r in rows if r[1] != 2] + assert len(meanwhile_rows) == 2 + assert len(you_scene_rows) == 2 + + # Witness flags: meanwhile rows have witness_you=0; you-scene + # rows have witness_you=1. Both sets have witness_host=witness_guest=1. + for owner, _scene, _pov, w_you, w_host, w_guest in meanwhile_rows: + assert w_you == 0, (owner, w_you) + assert w_host == 1 + assert w_guest == 1 + for owner, _scene, _pov, w_you, w_host, w_guest in you_scene_rows: + assert w_you == 1, (owner, w_you) + assert w_host == 1 + assert w_guest == 1 + + # ---- Witness-filtered FTS5 search returns the right slice + # per (owner, witness_role, query). ---- + from chat.state.memory import search_memories + + # Host POV (bot_a as host): both keywords are visible because + # bot_a is owner of both scenes' rows AND witness_host=1 in both. + hits_pottery_host = search_memories( + conn, "bot_a", "host", "pottery", k=4 + ) + assert len(hits_pottery_host) == 1 + assert "pottery" in hits_pottery_host[0]["pov_summary"] + assert hits_pottery_host[0]["scene_id"] == 2 + + hits_whiteboard_host = search_memories( + conn, "bot_a", "host", "whiteboard", k=4 + ) + assert len(hits_whiteboard_host) == 1 + assert "whiteboard" in hits_whiteboard_host[0]["pov_summary"] + # The you-scene memory carries scene_id of the active scene at + # turn-time. We don't pin the scene_id value (active_scene helper + # determines it) but we DO pin that it's NOT the meanwhile id. + assert hits_whiteboard_host[0]["scene_id"] != 2 + + # Guest POV (bot_b as guest): same expectation, witness_guest=1 + # in both scenes' bot_b rows. + hits_pottery_guest = search_memories( + conn, "bot_b", "guest", "pottery", k=4 + ) + assert len(hits_pottery_guest) == 1 + assert hits_pottery_guest[0]["scene_id"] == 2 + + hits_whiteboard_guest = search_memories( + conn, "bot_b", "guest", "whiteboard", k=4 + ) + assert len(hits_whiteboard_guest) == 1 + assert hits_whiteboard_guest[0]["scene_id"] != 2 + + # ---- Witness mask integrity: querying bot_a with witness_role='you' + # over the meanwhile keyword returns NOTHING (witness_you=0 for + # the meanwhile row). The you-scene row's witness_you=1 so a + # 'you' role query would surface IT, but since 'pottery' is + # only in the meanwhile row, the result set is empty. + hits_pottery_you = search_memories( + conn, "bot_a", "you", "pottery", k=4 + ) + assert hits_pottery_you == [], ( + "witness_you mask should filter the meanwhile row out of " + "owner=bot_a/role=you queries" + )