"""End-to-end turn flow (T19): user POSTs prose, server parses, streams via SSE. Covers: - POST ``/chats//turns`` returns 404 when the chat doesn't exist. - A successful POST appends both a ``user_turn`` and an ``assistant_turn`` event in chronological order. The assistant payload carries the full streamed text and ``truncated=False``. - After a turn lands, the chat detail GET renders the user prose and the assistant text from the event log. """ from __future__ import annotations import json from pathlib import Path import pytest from fastapi.testclient import TestClient from chat.app import app from chat.db.connection import open_db from chat.eventlog.log import append_and_apply, append_event from chat.eventlog.projector import project from chat.llm.mock import MockLLMClient from tests.fixtures import CannedQueue @pytest.fixture def client(tmp_path, monkeypatch): cfg = tmp_path / "config.toml" cfg.write_text('featherless_api_key = "test"\n') monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) db = tmp_path / "test.db" monkeypatch.setenv("CHAT_DB_PATH", str(db)) canned_parse = json.dumps( {"segments": [{"kind": "dialogue", "text": "hello"}]} ) canned_response = "Hi there." # Two state-update classifier calls fire after the assistant_turn # (one per directed edge: bot->you, you->bot). We feed them benign # zero-delta JSON so the existing assertions about ``user_turn`` / # ``assistant_turn`` are unaffected. canned_state_update = json.dumps( {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} ) # T26 scene-close detection runs after the state-update pass. These # tests don't seed an active scene so the classifier is short-circuited # in turns.py — but the canned slot is harmless to leave in place, # and adding it documents the order even when the call doesn't fire. canned_scene_close = json.dumps( {"should_close": False, "reason": "no signal"} ) # Import here so env vars are visible to the dependency lookup. from chat.web.kickoff import get_llm_client mock = MockLLMClient( canned=[ canned_parse, canned_response, canned_state_update, canned_state_update, canned_scene_close, ] ) app.dependency_overrides[get_llm_client] = lambda: mock with TestClient(app) as c: # Disable the lifespan-managed background worker — it would # otherwise try to score significance through Featherless with # a fake test API key. Worker behavior is exercised directly in # tests/test_significance.py with a mock LLM factory. app.state.background_worker.enabled = False c.mock_llm = mock # type: ignore[attr-defined] yield c app.dependency_overrides.clear() def _seed(db_path: Path) -> None: """Author a bot, create a chat, and seed enough state for prompt assembly.""" with open_db(db_path) as conn: append_event( conn, kind="bot_authored", payload={ "id": "bot_a", "name": "BotA", "persona": "thoughtful, observant", "voice_samples": [], "traits": [], "backstory": "", "initial_relationship_to_you": "", "kickoff_prose": "...", }, ) append_event( conn, kind="chat_created", payload={ "id": "chat_bot_a", "host_bot_id": "bot_a", "initial_time": "2026-04-26T20:00:00+00:00", "narrative_anchor": "Day 1", "weather": "", }, ) # Seed an edge so the prompt assembler has something to render. append_event( conn, kind="edge_update", payload={ "source_id": "bot_a", "target_id": "you", "chat_id": "chat_bot_a", "knowledge_facts": ["coworker"], }, ) # Activity for both speakers — required by the prompt assembler. append_event( conn, kind="activity_change", payload={ "entity_id": "you", "posture": "sitting", "action": { "verb": "talking", "interruptible": True, "required_attention": "low", "expected_duration": "ongoing", }, "attention": "", "holding": [], "status": {}, }, ) append_event( conn, kind="activity_change", payload={ "entity_id": "bot_a", "posture": "sitting", "action": { "verb": "listening", "interruptible": True, "required_attention": "low", "expected_duration": "ongoing", }, "attention": "", "holding": [], "status": {}, }, ) project(conn) def test_post_turn_404_when_chat_missing(client): response = client.post("/chats/no_such/turns", data={"prose": "hello"}) assert response.status_code == 404 def test_post_turn_appends_user_and_assistant_events(client, tmp_path): _seed(tmp_path / "test.db") response = client.post( "/chats/chat_bot_a/turns", data={"prose": "hello"} ) assert response.status_code == 204 with open_db(tmp_path / "test.db") as conn: cur = conn.execute( "SELECT kind, payload_json FROM event_log " "WHERE kind IN ('user_turn', 'assistant_turn') ORDER BY id" ) rows = cur.fetchall() assert len(rows) == 2 assert rows[0][0] == "user_turn" assert rows[1][0] == "assistant_turn" user_payload = json.loads(rows[0][1]) assert user_payload["chat_id"] == "chat_bot_a" assert user_payload["prose"] == "hello" # Segments come from the canned classifier output. assert any( s.get("kind") == "dialogue" and s.get("text") == "hello" for s in user_payload["segments"] ) assistant_payload = json.loads(rows[1][1]) assert assistant_payload["chat_id"] == "chat_bot_a" assert assistant_payload["speaker_id"] == "bot_a" assert assistant_payload["text"] == "Hi there." assert assistant_payload["truncated"] is False def test_get_chat_renders_existing_turns(client, tmp_path): _seed(tmp_path / "test.db") post = client.post("/chats/chat_bot_a/turns", data={"prose": "hello"}) assert post.status_code == 204 response = client.get("/chats/chat_bot_a") assert response.status_code == 200 body = response.text assert "hello" in body assert "Hi there." in body # --------------------------------------------------------------------------- # Phase 2 (T44) — multi-entity turn flow. # # These tests cover the post_turn flow when a guest is present: addressee # detection, multi-pair state-update + multi-witness memory writes, and # the optional interjection follow-on. Each test installs its own # MockLLMClient with a canned-response queue tailored to the call shape # of that scenario; the queue is documented at the top of each test so # the orchestration is auditable. # --------------------------------------------------------------------------- def _bot_payload(bot_id: str, name: str, persona: str = "") -> dict: return { "id": bot_id, "name": name, "persona": persona or f"persona for {name}", "voice_samples": [], "traits": [], "backstory": "", "initial_relationship_to_you": "", "kickoff_prose": "...", } def _seed_chat_with_guest(db_path: Path) -> None: """Author host BotA + guest BotB, create a chat with both wired in, and seed an open scene plus minimal activity rows so the prompt assembler sees a third party. Edges are seeded for all six directed pairs at the schema-default 50/50 baseline so multi-pair state updates land cleanly.""" with open_db(db_path) as conn: append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) append_event(conn, kind="bot_authored", payload=_bot_payload("bot_b", "BotB")) append_event( conn, kind="you_authored", payload={"name": "Me", "pronouns": "they/them", "persona": ""}, ) append_event( conn, kind="chat_created", payload={ "id": "chat_bot_a", "host_bot_id": "bot_a", "guest_bot_id": "bot_b", "initial_time": "2026-04-26T20:00:00+00:00", "narrative_anchor": "Day 1", "weather": "", }, ) # Container + open scene so scene_close detection has something # to act on in the per-POV summary test. append_event( conn, kind="container_created", payload={ "chat_id": "chat_bot_a", "name": "office", "type": "workplace", "properties": {}, }, ) append_event( conn, kind="scene_opened", payload={ "chat_id": "chat_bot_a", "container_id": 1, "started_at": "2026-04-26T20:00:00+00:00", "participants": ["you", "bot_a", "bot_b"], }, ) # Seed all six directed edges so state-update writes land on # initialized rows. Knowledge fact on bot_a -> you exercises # the existing-fact preservation path. for src, tgt, facts in [ ("bot_a", "you", ["coworker"]), ("you", "bot_a", []), ("bot_b", "you", []), ("you", "bot_b", []), ("bot_a", "bot_b", []), ("bot_b", "bot_a", []), ]: append_event( conn, kind="edge_update", payload={ "source_id": src, "target_id": tgt, "chat_id": "chat_bot_a", "knowledge_facts": facts, }, ) for entity_id, verb in [ ("you", "talking"), ("bot_a", "listening"), ("bot_b", "listening"), ]: append_event( conn, kind="activity_change", payload={ "entity_id": entity_id, "posture": "sitting", "action": { "verb": verb, "interruptible": True, "required_attention": "low", "expected_duration": "ongoing", }, "attention": "", "holding": [], "status": {}, }, ) project(conn) def _override_llm(canned: list[str]) -> MockLLMClient: """Wire a fresh ``MockLLMClient`` and return it so tests can introspect the residual canned queue after the request.""" from chat.web.kickoff import get_llm_client mock = MockLLMClient(canned=list(canned)) app.dependency_overrides[get_llm_client] = lambda: mock return mock def _zero_state() -> str: return json.dumps( {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} ) @pytest.fixture def app_state_setup(tmp_path, monkeypatch): """Same env wiring as the existing ``client`` fixture but without a pre-installed MockLLMClient — the multi-entity tests pin their own canned queues per scenario. """ cfg = tmp_path / "config.toml" cfg.write_text('featherless_api_key = "test"\n') monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) db = tmp_path / "test.db" monkeypatch.setenv("CHAT_DB_PATH", str(db)) with TestClient(app) as c: app.state.background_worker.enabled = False yield c app.dependency_overrides.clear() def test_single_bot_turn_no_guest_regression(app_state_setup, tmp_path): """No-guest regression: the canned-response queue remains parse + narrative + 2 state-updates. Interjection is path-bypassed because the chat has no guest, so ``detect_interjection`` is NOT invoked. Ends with one user_turn, one assistant_turn, two edge_updates, and a single ``memory_written``. T116: migrated to :class:`tests.fixtures.CannedQueue` as a proof of concept for the structured canned-queue builder. """ _seed(tmp_path / "test.db") canned = ( CannedQueue() .parse_turn(segments=[{"kind": "dialogue", "text": "hello"}]) .narrative("Hi there.") .state_update() .state_update() .build() ) mock = _override_llm(canned) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "hello"} ) assert response.status_code == 204 finally: app.dependency_overrides.clear() # No guest -> no interjection classifier call -> queue fully drained. assert mock._canned == [] with open_db(tmp_path / "test.db") as conn: cur = conn.execute( "SELECT kind FROM event_log " "WHERE kind IN ('user_turn', 'assistant_turn', 'edge_update', " " 'memory_written') ORDER BY id" ) kinds = [r[0] for r in cur.fetchall()] user_turns = [k for k in kinds if k == "user_turn"] assistant_turns = [k for k in kinds if k == "assistant_turn"] edge_updates_after_seed = [k for k in kinds if k == "edge_update"] memory_writes = [k for k in kinds if k == "memory_written"] assert len(user_turns) == 1 assert len(assistant_turns) == 1 # Seed adds exactly one edge_update (bot_a -> you); the post-turn # pass adds two more for a total of three. assert len(edge_updates_after_seed) == 3 assert len(memory_writes) == 1 def test_multi_bot_turn_no_interjection(app_state_setup, tmp_path): """Chat has a guest; ``detect_interjection`` returns False. Verify: 1 user_turn + 1 assistant_turn + 6 *post-turn* edge_updates + 2 memory_written events. Single turn_html broadcast. Canned queue (11 calls): 1. parse_turn 2. detect_addressee (T74.1) -> host 3. narrative stream (primary, addressee = host because the prose doesn't name the guest) 4-9. 6 state-update calls (one per directed pair across {you, bot_a, bot_b}) 10. detect_interjection -> should_interject=False 11. detect_scene_close -> should_close=False """ _seed_chat_with_guest(tmp_path / "test.db") canned_parse = json.dumps( {"segments": [{"kind": "dialogue", "text": "hello room"}]} ) canned = [ canned_parse, json.dumps( {"addressee_id": "bot_a", "confidence": "medium", "reason": "host"} ), "Greetings.", _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), json.dumps({"should_interject": False, "reason": "calm"}), json.dumps({"should_close": False, "reason": "no signal"}), ] mock = _override_llm(canned) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "hello room"} ) assert response.status_code == 204 finally: app.dependency_overrides.clear() # All 10 canned slots should have been consumed. assert mock._canned == [] with open_db(tmp_path / "test.db") as conn: # Count post-turn edge_updates (i.e. those after the latest # assistant_turn id). max_at = conn.execute( "SELECT MAX(id) FROM event_log WHERE kind = 'assistant_turn'" ).fetchone()[0] cur = conn.execute( "SELECT COUNT(*) FROM event_log " "WHERE kind = 'edge_update' AND id > ?", (max_at,), ) post_turn_edge_updates = cur.fetchone()[0] cur = conn.execute( "SELECT COUNT(*) FROM event_log WHERE kind = 'user_turn'" ) user_turn_count = cur.fetchone()[0] cur = conn.execute( "SELECT COUNT(*) FROM event_log WHERE kind = 'assistant_turn'" ) assistant_turn_count = cur.fetchone()[0] cur = conn.execute( "SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'" ) memory_count = cur.fetchone()[0] assert user_turn_count == 1 assert assistant_turn_count == 1 assert post_turn_edge_updates == 6 assert memory_count == 2 def test_multi_bot_turn_with_interjection(app_state_setup, tmp_path): """Chat has a guest; ``detect_interjection`` returns True. Verify: 1 user_turn + 2 assistant_turns + (6 + 6) post-turn edge_updates + 4 memory_written events. Canned queue (17 calls): 1. parse_turn 2. detect_addressee (T74.1) -> host 3. narrative stream (primary) 4-9. 6 state-update calls (post-primary) 10. detect_interjection -> should_interject=True 11. narrative stream (interjection) 12-17. 6 state-update calls (post-interjection) 18. detect_scene_close -> should_close=False """ _seed_chat_with_guest(tmp_path / "test.db") canned_parse = json.dumps( {"segments": [{"kind": "dialogue", "text": "tell me"}]} ) canned = [ canned_parse, json.dumps( {"addressee_id": "bot_a", "confidence": "medium", "reason": "host"} ), "Primary beat.", _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), json.dumps({"should_interject": True, "reason": "jealous"}), "Interjection beat!", _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), json.dumps({"should_close": False, "reason": "no signal"}), ] mock = _override_llm(canned) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "tell me"} ) assert response.status_code == 204 finally: app.dependency_overrides.clear() assert mock._canned == [] with open_db(tmp_path / "test.db") as conn: cur = conn.execute( "SELECT COUNT(*) FROM event_log WHERE kind = 'assistant_turn'" ) assistant_count = cur.fetchone()[0] cur = conn.execute( "SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'" ) memory_count = cur.fetchone()[0] # All edge_updates after the FIRST assistant_turn are post-turn. first_at = conn.execute( "SELECT MIN(id) FROM event_log WHERE kind = 'assistant_turn'" ).fetchone()[0] post_turn_edges = conn.execute( "SELECT COUNT(*) FROM event_log " "WHERE kind = 'edge_update' AND id > ?", (first_at,), ).fetchone()[0] # Both assistant_turn payloads should reference the same user_turn # and the second one tags ``interjection_of`` the first speaker. rows = conn.execute( "SELECT payload_json FROM event_log " "WHERE kind = 'assistant_turn' ORDER BY id" ).fetchall() first_payload = json.loads(rows[0][0]) second_payload = json.loads(rows[1][0]) assert assistant_count == 2 assert memory_count == 4 assert post_turn_edges == 12 assert first_payload["text"] == "Primary beat." assert second_payload["text"] == "Interjection beat!" # The silent witness is the bot that wasn't the primary addressee. assert second_payload["interjection_of"] == first_payload["speaker_id"] assert second_payload["speaker_id"] != first_payload["speaker_id"] assert first_payload["user_turn_id"] == second_payload["user_turn_id"] def test_multi_bot_turn_scene_close_writes_per_pov_summaries( app_state_setup, tmp_path ): """Chat has a guest, prose hard-signals a scene close, classifier confirms. Verify a ``scene_closed`` event lands and per-POV summary rewrites fire for both bots (memory.pov_summary changes for each). Interjection short-circuits at False so the queue stays compact. Canned queue (13 calls): 1. parse_turn 2. detect_addressee (T74.1) -> host 3. narrative stream (primary) 4-9. 6 state-update calls 10. detect_interjection -> False (no follow-on stream) 11. detect_scene_close -> True 12. apply_scene_close_summary host POV 13. apply_scene_close_summary guest POV """ _seed_chat_with_guest(tmp_path / "test.db") canned_parse = json.dumps( { "segments": [ {"kind": "narration", "text": "we are done here, fade out"} ] } ) pov_payload = json.dumps( { "summary": "BotA noticed the day winding down.", "knowledge_facts": [], "relationship_summary": "warmer", } ) pov_payload_guest = json.dumps( { "summary": "BotB watched the scene close.", "knowledge_facts": [], "relationship_summary": "warmer", } ) canned = [ canned_parse, json.dumps( {"addressee_id": "bot_a", "confidence": "medium", "reason": "host"} ), "Goodnight.", _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), json.dumps({"should_interject": False, "reason": "calm"}), json.dumps({"should_close": True, "reason": "fade out signaled"}), pov_payload, pov_payload_guest, ] mock = _override_llm(canned) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "we are done here, fade out"} ) assert response.status_code == 204 finally: app.dependency_overrides.clear() assert mock._canned == [] with open_db(tmp_path / "test.db") as conn: cur = conn.execute( "SELECT COUNT(*) FROM event_log WHERE kind = 'scene_closed'" ) scene_close_count = cur.fetchone()[0] # One memory_pov_summary manual_edit per witness. cur = conn.execute( "SELECT payload_json FROM event_log WHERE kind = 'manual_edit'" ) manual_edits = [json.loads(r[0]) for r in cur.fetchall()] pov_edits = [ e for e in manual_edits if e.get("target_kind") == "memory_pov_summary" ] # After the rewrite, bot_a's scene-1 memory carries the host POV # and bot_b's scene-1 memory carries the guest POV. host_pov = conn.execute( "SELECT pov_summary FROM memories WHERE owner_id = ? AND scene_id = 1", ("bot_a",), ).fetchone() guest_pov = conn.execute( "SELECT pov_summary FROM memories WHERE owner_id = ? AND scene_id = 1", ("bot_b",), ).fetchone() assert scene_close_count == 1 # Two memory rewrites — one per witness. assert len(pov_edits) == 2 assert host_pov is not None and "BotA noticed" in host_pov[0] assert guest_pov is not None and "BotB watched" in guest_pov[0] def test_addressee_detection_routes_to_named_bot(app_state_setup, tmp_path): """T74.1: the multi-entity addressee call goes through the classifier; when the classifier returns the guest, the primary turn routes there. Interjection (when fired) makes the host the silent witness and the second assistant_turn carries the host as speaker. Canned queue (with classifier-led addressee = guest): 1. parse_turn 2. detect_addressee -> bot_b (the guest) 3. narrative stream (primary, addressee = guest) 4-9. 6 state-update calls 10. detect_interjection -> True 11. interjection narrative stream 12-17. 6 state-update calls (post-interjection) 18. detect_scene_close -> False """ _seed_chat_with_guest(tmp_path / "test.db") canned_parse = json.dumps( {"segments": [{"kind": "dialogue", "text": "BotB, what do you think?"}]} ) canned = [ canned_parse, json.dumps( { "addressee_id": "bot_b", "confidence": "high", "reason": "user named BotB", } ), "BotB pondering.", _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), json.dumps({"should_interject": True, "reason": "host wants in"}), "BotA chiming in.", _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), json.dumps({"should_close": False, "reason": "no signal"}), ] mock = _override_llm(canned) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "BotB, what do you think?"}, ) assert response.status_code == 204 finally: app.dependency_overrides.clear() assert mock._canned == [] with open_db(tmp_path / "test.db") as conn: rows = conn.execute( "SELECT payload_json FROM event_log " "WHERE kind = 'assistant_turn' ORDER BY id" ).fetchall() primary_payload = json.loads(rows[0][0]) interjection_payload = json.loads(rows[1][0]) # Primary speaker is the guest because the addressee classifier # picked bot_b for the prose ("BotB, what do you think?"). assert primary_payload["speaker_id"] == "bot_b" # Interjection follow-on goes to the silent witness — the host. assert interjection_payload["speaker_id"] == "bot_a" assert interjection_payload["interjection_of"] == "bot_b" def test_cancelled_turn_still_closes_scene_when_user_prose_signals_close( app_state_setup, tmp_path ): """T74.3 regression: a cancelled primary stream still triggers scene close when the user prose carries a hard close signal. Rationale (also documented in turns.py near the close-detection branch): close detection only consumes the user's prose, which is fully appended to the event_log BEFORE streaming starts. The cancelled bot beat doesn't invalidate the user's intent to close. Implementation: install a MockLLMClient whose ``stream`` raises CancelledError on the first iteration. The classifier calls (parse, addressee, scene_close, per-POV summaries) are still served from the canned queue. The post_turn route ultimately re-raises CancelledError after recording the partial — TestClient surfaces that as an exception, so we drive the request inside ``with pytest.raises``. Despite the exception, the scene_closed event must land in the event_log. T108 NOTE — this test does NOT actually exercise the cancel path. ``_CancelOnStreamMock.stream`` writes ``raise asyncio.CancelledError`` but ``asyncio`` is not imported at module scope, so the first iteration raises ``NameError`` (caught by ``except Exception:`` in post_turn, which sets ``primary_truncated=True`` but leaves ``cancelled=False``). The function therefore returns 204 normally, the dependency-managed connection commits, and ``scene_closed`` lands. Importing asyncio so the real CancelledError fires reveals a transactional bug: ``post_turn``'s end-of-function re-raise causes ``open_db``'s dependency teardown to skip ``conn.commit()``, rolling back ALL post-cancel writes (user_turn, assistant_turn, edge_updates, scene_closed). Deferred for triage — see T108 report. """ from typing import AsyncIterator, Sequence _seed_chat_with_guest(tmp_path / "test.db") canned_parse = json.dumps( {"segments": [{"kind": "narration", "text": "we are done here, fade out"}]} ) pov_payload = json.dumps( { "summary": "BotA noticed the day winding down.", "knowledge_facts": [], "relationship_summary": "warmer", } ) pov_payload_guest = json.dumps( { "summary": "BotB watched the scene close.", "knowledge_facts": [], "relationship_summary": "warmer", } ) # Canned queue: parse + addressee + 6 state-updates + # scene_close=True + 2 per-POV summaries. NO interjection slot # because the cancel path short-circuits the interjection branch. canned = [ canned_parse, json.dumps( {"addressee_id": "bot_a", "confidence": "medium", "reason": "host"} ), # NOTE: no narrative slot — the stream is hijacked below to # raise CancelledError on first iteration; it never pulls a # canned response. _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), json.dumps({"should_close": True, "reason": "fade out signaled"}), pov_payload, pov_payload_guest, ] class _CancelOnStreamMock: """Mock LLM client that serves ``generate`` from a canned queue and raises CancelledError on the FIRST iteration of ``stream``. Mirrors :class:`chat.llm.mock.MockLLMClient` for ``generate`` but diverges on ``stream`` to simulate a mid-stream cancel. """ def __init__(self, canned: list[str]) -> None: self._canned = list(canned) async def generate( self, messages: Sequence, *, model: str, **params ) -> str: return self._canned.pop(0) async def stream( self, messages: Sequence, *, model: str, **params ) -> AsyncIterator[str]: # Yield a CancelledError on first iteration to simulate the # /turns/cancel route firing mid-stream. raise asyncio.CancelledError yield # pragma: no cover — keeps this an async generator. from chat.web.kickoff import get_llm_client mock = _CancelOnStreamMock(canned=list(canned)) app.dependency_overrides[get_llm_client] = lambda: mock try: # FastAPI/Starlette handles the re-raised CancelledError as an # internal failure — TestClient surfaces it as a 500 response. # We don't assert on the status here; the regression is whether # the scene_closed event still landed in the event_log. try: app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "we are done here, fade out"}, ) except BaseException: # Some Starlette/asyncio versions propagate the # CancelledError out of the test client; that's fine — the # partial-record + scene-close still ran before the raise. pass finally: app.dependency_overrides.clear() with open_db(tmp_path / "test.db") as conn: scene_close_count = conn.execute( "SELECT COUNT(*) FROM event_log WHERE kind = 'scene_closed'" ).fetchone()[0] assistant_payload = conn.execute( "SELECT payload_json FROM event_log " "WHERE kind = 'assistant_turn' ORDER BY id" ).fetchall() # T108: pin the ordering — user_turn must commit before # scene_closed (close detection runs on prose that is already # in the event_log) and any assistant_turn the cancel produced # must come last (truncated record written after both). ordered = conn.execute( "SELECT id, kind FROM event_log " "WHERE kind IN ('user_turn', 'scene_closed', 'assistant_turn') " "ORDER BY id" ).fetchall() # Scene close lands despite the cancel. assert scene_close_count == 1 # The cancelled assistant_turn was still recorded (truncated=True). assert len(assistant_payload) == 1 assert json.loads(assistant_payload[0][0])["truncated"] is True # T108 ordering pin: user_turn lands first, the truncated # assistant_turn (if any) is committed BEFORE the scene_close # decision fires, and scene_closed lands last. Close detection # relies on user prose being committed to the event_log BEFORE # the close decision runs — and the cancelled assistant beat is # recorded as a partial before close-detection too. kinds_in_order = [row[1] for row in ordered] user_idx = kinds_in_order.index("user_turn") close_idx = kinds_in_order.index("scene_closed") assert user_idx < close_idx if "assistant_turn" in kinds_in_order: assert user_idx < kinds_in_order.index("assistant_turn") < close_idx def test_interjection_enqueues_significance_job(app_state_setup, tmp_path): """T74.2: when an interjection fires, the interjection memory is enqueued for significance scoring just like the primary memory. Capture enqueued ``SignificanceJob``s by replacing the background worker's ``enqueue`` method with a list-append. Without T74.2, the interjection memory would never be scored — only the primary's enqueue would land. We therefore expect TWO jobs after a turn that has both a primary and an interjection beat: one for the primary memory, one for the interjection memory. """ _seed_chat_with_guest(tmp_path / "test.db") canned_parse = json.dumps( {"segments": [{"kind": "dialogue", "text": "tell me"}]} ) canned = [ canned_parse, json.dumps( {"addressee_id": "bot_a", "confidence": "medium", "reason": "host"} ), "Primary beat.", _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), json.dumps({"should_interject": True, "reason": "jealous"}), "Interjection beat!", _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), _zero_state(), json.dumps({"should_close": False, "reason": "no signal"}), ] _override_llm(canned) captured_jobs: list = [] worker = app.state.background_worker # Re-enable enqueue capture even though the worker's loop is disabled # — we want to count enqueues without the loop running classifier work. worker.enabled = True original_enqueue = worker.enqueue worker.enqueue = captured_jobs.append # type: ignore[assignment] try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "tell me"} ) assert response.status_code == 204 finally: worker.enqueue = original_enqueue # type: ignore[assignment] worker.enabled = False app.dependency_overrides.clear() # Expect 2 enqueues: 1 for the primary memory + 1 for the # interjection memory. assert len(captured_jobs) == 2 # Both jobs should reference distinct memory ids — the primary's # host-POV memory and the interjection's host-POV memory. memory_ids = [job.memory_id for job in captured_jobs] assert len(set(memory_ids)) == 2 # The two narrative texts should be the two streamed beats. narrative_texts = sorted(job.narrative_text for job in captured_jobs) assert narrative_texts == ["Interjection beat!", "Primary beat."] # --------------------------------------------------------------------------- # Phase 3 (T61) — per-turn event-lifecycle detection + completion promotion. # # After the post-turn classifier passes (memory write, state update, # interjection check) and BEFORE scene-close detection, ``post_turn`` # calls :func:`detect_event_transitions`. Each transition becomes one # of ``event_started`` / ``event_completed`` / ``event_cancelled``. A # completed event is followed inline by ``promote_completed_event`` so # the props it carries (knowledge_facts, etc.) land in state # synchronously. # # When no active events are seeded the classifier short-circuits without # an LLM call (per T52) — the canned queue therefore needs ZERO extra # slots in that case. # --------------------------------------------------------------------------- def test_turn_with_event_transition_appends_started_event( app_state_setup, tmp_path ): """A planned event becomes active when the classifier reports a ``new_status='active'`` transition for that event_id. Canned queue (5 calls — single-bot, no scene seeded): 1. parse_turn 2. narrative stream 3. state-update bot_a -> you 4. state-update you -> bot_a 5. detect_event_transitions -> 1 transition (active) """ _seed(tmp_path / "test.db") # Seed a planned event so list_active_events returns 1 row. Use # append_and_apply so we don't re-replay the prior chat_created event # (whose handler is INSERT-not-IGNORE and would 409 on replay). with open_db(tmp_path / "test.db") as conn: append_and_apply( conn, kind="event_planned", payload={ "event_id": "evt_1", "chat_id": "chat_bot_a", "kind": "story_event", "props": {}, "planned_for": "2026-04-30T18:00:00+00:00", }, ) # T116: migrated to :class:`tests.fixtures.CannedQueue`. canned = ( CannedQueue() .parse_turn(segments=[{"kind": "dialogue", "text": "they arrived"}]) .narrative("They walk in.") .state_update() .state_update() .detect_event_transitions( [ { "event_id": "evt_1", "new_status": "active", "reason": "they arrived", } ] ) .build() ) mock = _override_llm(canned) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "they arrived"} ) assert response.status_code == 204 finally: app.dependency_overrides.clear() # All 5 canned slots consumed. assert mock._canned == [] with open_db(tmp_path / "test.db") as conn: # event_started landed in event_log. rows = conn.execute( "SELECT payload_json FROM event_log " "WHERE kind = 'event_started' ORDER BY id" ).fetchall() assert len(rows) == 1 started_payload = json.loads(rows[0][0]) assert started_payload["event_id"] == "evt_1" assert started_payload["started_at"] == "2026-04-26T20:00:00+00:00" # T114.1: payload carries the back-reference to the assistant_turn # that triggered the transition. The assistant_turn lands in # event_log immediately before the event_started, so its id is # the largest assistant_turn id in the chat at this point. at_id = conn.execute( "SELECT id FROM event_log " "WHERE kind = 'assistant_turn' " " AND json_extract(payload_json, '$.chat_id') = 'chat_bot_a' " "ORDER BY id DESC LIMIT 1" ).fetchone()[0] assert started_payload["triggered_by_assistant_turn_id"] == at_id # The events projection row reflects the active status. ev_row = conn.execute( "SELECT status, started_at FROM events WHERE event_id = ?", ("evt_1",), ).fetchone() assert ev_row is not None assert ev_row[0] == "active" assert ev_row[1] == "2026-04-26T20:00:00+00:00" def test_turn_with_event_completion_runs_promotion(app_state_setup, tmp_path): """An active event with knowledge_facts in props completes; the inline call to ``promote_completed_event`` emits the corresponding ``edge_update``. """ _seed(tmp_path / "test.db") # Seed: planned -> started so the event is currently active. Props # carry a knowledge_fact that promotion will turn into an edge_update. # Use append_and_apply (not project) to avoid re-replaying chat_created. with open_db(tmp_path / "test.db") as conn: append_and_apply( conn, kind="event_planned", payload={ "event_id": "evt_2", "chat_id": "chat_bot_a", "kind": "story_event", "props": { "knowledge_facts": [ { "owner_id": "bot_a", "target_id": "you", "fact": "Maya likes pottery", } ] }, "planned_for": "2026-04-30T18:00:00+00:00", }, ) append_and_apply( conn, kind="event_started", payload={ "event_id": "evt_2", "started_at": "2026-04-30T19:00:00+00:00", }, ) # Snapshot the max event_log id so we can assert on rows AFTER the turn. with open_db(tmp_path / "test.db") as conn: before_id = conn.execute( "SELECT COALESCE(MAX(id), 0) FROM event_log" ).fetchone()[0] canned_parse = json.dumps( {"segments": [{"kind": "dialogue", "text": "we wrap it up"}]} ) canned_event_decision = json.dumps( { "transitions": [ { "event_id": "evt_2", "new_status": "completed", "reason": "wrapped", } ] } ) mock = _override_llm( [ canned_parse, "They wrap it up.", _zero_state(), _zero_state(), canned_event_decision, ] ) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "we wrap it up"} ) assert response.status_code == 204 finally: app.dependency_overrides.clear() assert mock._canned == [] with open_db(tmp_path / "test.db") as conn: # event_completed landed. completed_rows = conn.execute( "SELECT id, payload_json FROM event_log " "WHERE kind = 'event_completed' AND id > ? ORDER BY id", (before_id,), ).fetchall() assert len(completed_rows) == 1 completed_payload = json.loads(completed_rows[0][1]) assert completed_payload["event_id"] == "evt_2" completed_id = completed_rows[0][0] # promote_completed_event ran inline AFTER event_completed: the # follow-on edge_update carries the knowledge fact and is tagged # with source=event_promotion. promo_rows = conn.execute( "SELECT payload_json FROM event_log " "WHERE kind = 'edge_update' AND id > ? ORDER BY id", (completed_id,), ).fetchall() promo_facts: list[str] = [] for (payload_json,) in promo_rows: p = json.loads(payload_json) if p.get("source") == "event_promotion": promo_facts.extend(p.get("knowledge_facts") or []) assert "Maya likes pottery" in promo_facts def test_turn_with_no_active_events_skips_classifier(app_state_setup, tmp_path): """When no active events are seeded, ``detect_event_transitions`` short-circuits without an LLM call (per T52). The canned queue must therefore have ZERO event-detection slots — same shape as the Phase 2 no-guest baseline. T116: migrated to :class:`tests.fixtures.CannedQueue`. """ _seed(tmp_path / "test.db") # Only 4 slots: parse + narrative + 2 state-updates. NO extra slot for # event-detection — non-existent active_events causes the helper to # short-circuit before pulling from the queue. canned = ( CannedQueue() .parse_turn(segments=[{"kind": "dialogue", "text": "hello"}]) .narrative("Hi there.") .state_update() .state_update() .build() ) mock = _override_llm(canned) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "hello"} ) assert response.status_code == 204 finally: app.dependency_overrides.clear() # Queue fully drained — no canned slot was consumed by event detection. assert mock._canned == [] with open_db(tmp_path / "test.db") as conn: for kind in ("event_started", "event_completed", "event_cancelled"): count = conn.execute( "SELECT COUNT(*) FROM event_log WHERE kind = ?", (kind,) ).fetchone()[0] assert count == 0, f"expected zero {kind} events, got {count}" # --------------------------------------------------------------------------- # Phase 3 (T62) — natural-language skip-command surface. # # The classifier may flag prose as a time-skip directive via # ``ParsedTurn.intent``. Elision runs through the shared controller in # :mod:`chat.web.skip` and short-circuits the regular narrative path; # jump returns 422 directing the user to the drawer's structured form # (Phase 3 simpler path — natural-language jump time derivation is too # fragile for v1 without the structured surface). # --------------------------------------------------------------------------- def test_elision_skip_via_natural_language(app_state_setup, tmp_path): """User prose 'skip to when we arrive at the park' classifies as ``intent='skip_elision'``. The post_turn handler short-circuits the narrative path, advances the chat clock by an hour stub, appends a ``time_skip_elision`` event AND an ``assistant_turn`` carrying the canned narration. No ``user_turn`` is emitted on the skip path. Canned queue: 1 parse_turn (intent=skip_elision) + 1 narration string (consumed by ``narrate_skip``). No state-update / scene-close / event-detection slots — those branches are bypassed entirely. """ _seed(tmp_path / "test.db") canned_parse = json.dumps( { "segments": [ {"kind": "dialogue", "text": "skip to when we arrive at the park"} ], "intent": "skip_elision", "landing_state_hint": "we arrive at the park", } ) canned_narration = "We pull up to the park entrance, sun low in the sky." mock = _override_llm([canned_parse, canned_narration]) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "skip to when we arrive at the park"}, ) assert response.status_code == 204 finally: app.dependency_overrides.clear() # Both canned slots drained — no other classifier branches ran. assert mock._canned == [] with open_db(tmp_path / "test.db") as conn: # time_skip_elision landed. skip_rows = conn.execute( "SELECT payload_json FROM event_log " "WHERE kind = 'time_skip_elision' ORDER BY id" ).fetchall() assert len(skip_rows) == 1 sp = json.loads(skip_rows[0][0]) assert sp["chat_id"] == "chat_bot_a" # 1-hour stub from the seeded chat clock (20:00 -> 21:00). assert sp["new_time"].startswith("2026-04-26T21:00:00") # Chat clock advanced via the projector. from chat.state.world import get_chat chat = get_chat(conn, "chat_bot_a") assert chat["time"].startswith("2026-04-26T21:00:00") # An assistant_turn carrying the canned narration was appended. turn_rows = conn.execute( "SELECT payload_json FROM event_log " "WHERE kind = 'assistant_turn' ORDER BY id" ).fetchall() assert len(turn_rows) == 1 tp = json.loads(turn_rows[0][0]) assert tp["chat_id"] == "chat_bot_a" assert tp["text"] == canned_narration assert tp["speaker_id"] == "bot_a" assert tp["truncated"] is False # No user_turn lands on the skip path — the natural-language # skip is a command, not a beat the bots should remember. user_count = conn.execute( "SELECT COUNT(*) FROM event_log WHERE kind = 'user_turn'" ).fetchone()[0] assert user_count == 0 def test_jump_skip_via_natural_language_returns_422(app_state_setup, tmp_path): """User prose 'next morning' classifies as ``intent='skip_jump'``. The handler returns 422 with a guidance payload pointing the author at the drawer's structured jump form. No event is emitted — the drawer form is the only entry point for jump skips in Phase 3. """ _seed(tmp_path / "test.db") canned_parse = json.dumps( { "segments": [{"kind": "dialogue", "text": "next morning"}], "intent": "skip_jump", "landing_state_hint": "", } ) # Only one canned slot — parse — because the 422 fallback short- # circuits before any other classifier runs. mock = _override_llm([canned_parse]) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "next morning"} ) assert response.status_code == 422 body = response.json() # Guidance payload mentions the drawer so the client can surface # the right CTA; we don't pin the exact wording. assert "drawer" in body.get("error", "").lower() finally: app.dependency_overrides.clear() # Parse slot consumed; no follow-on classifier calls. assert mock._canned == [] with open_db(tmp_path / "test.db") as conn: for kind in ( "user_turn", "assistant_turn", "time_skip_elision", "time_skip_jump", ): count = conn.execute( "SELECT COUNT(*) FROM event_log WHERE kind = ?", (kind,) ).fetchone()[0] assert count == 0, f"expected zero {kind} on jump-via-NL, got {count}" def test_skip_command_does_not_run_narrative_classifier( app_state_setup, tmp_path, monkeypatch ): """The skip dispatch branch must bypass the narrative-prompt assembly entirely. We monkeypatch ``assemble_narrative_prompt`` (re-bound on the ``chat.web.turns`` module since the handler imports it by name) and assert the call count is zero after the elision skip lands. """ _seed(tmp_path / "test.db") canned_parse = json.dumps( { "segments": [ {"kind": "dialogue", "text": "skip to when we arrive at the park"} ], "intent": "skip_elision", "landing_state_hint": "we arrive at the park", } ) canned_narration = "We arrive moments later." mock = _override_llm([canned_parse, canned_narration]) call_counter = {"n": 0} def _spy(*args, **kwargs): call_counter["n"] += 1 return [] # Patch the symbol at the handler's import site so we can assert # the skip path bypasses prompt assembly even when the symbol still # exists in the module namespace. from chat.web import turns as turns_mod monkeypatch.setattr(turns_mod, "assemble_narrative_prompt", _spy) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "skip to when we arrive at the park"}, ) assert response.status_code == 204 finally: app.dependency_overrides.clear() assert mock._canned == [] assert call_counter["n"] == 0, ( "assemble_narrative_prompt was called on the skip path; the " "natural-language skip dispatch must bypass narrative assembly." ) # --------------------------------------------------------------------------- # Phase 3.5 (T82.1) — post_turn consumes pending meanwhile digests. # # The helper ``consume_pending_meanwhile_digests`` lives in # chat.services.prompt and is now wired into the END of post_turn (after # scene-close detection, before the response broadcast). This pins the # wiring so future refactors don't accidentally drop the call and leave # digests pending forever. # --------------------------------------------------------------------------- def test_post_turn_consumes_pending_meanwhile_digests( app_state_setup, tmp_path ): """Seed a pending meanwhile digest via ``meanwhile_digest_created``, POST a regular you-turn through post_turn, and assert: 1. A ``meanwhile_digest_consumed`` event lands in the event_log. 2. ``list_pending_meanwhile_digests`` returns empty after the turn. The post_turn flow surfaces the digest in the prompt (T65) and then consumes it (T82.1) so the next turn starts clean. """ _seed(tmp_path / "test.db") db_path = tmp_path / "test.db" # Seed a pending digest directly via the projection event. The scene_id # field doesn't need to reference an existing meanwhile scene for the # digest table — the FK is on the digest payload only. with open_db(db_path) as conn: append_and_apply( conn, kind="meanwhile_digest_created", payload={ "scene_id": 99, "chat_id": "chat_bot_a", "summary": "While you were away, the bots talked.", }, ) # Confirm the digest is pending before the turn lands. from chat.state.meanwhile import list_pending_meanwhile_digests assert len(list_pending_meanwhile_digests(conn, "chat_bot_a")) == 1 canned_parse = json.dumps( {"segments": [{"kind": "dialogue", "text": "hello"}]} ) # Standard 4-slot queue: parse + narrative + 2 state-updates. No # active scene so scene-close detection short-circuits without an LLM # call (consistent with the no-guest regression test). mock = _override_llm( [canned_parse, "Hi there.", _zero_state(), _zero_state()] ) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "hello"} ) assert response.status_code == 204 finally: app.dependency_overrides.clear() # All canned slots drained — no extra classifier calls fired. assert mock._canned == [] with open_db(db_path) as conn: # A meanwhile_digest_consumed event landed for the seeded digest. consumed_rows = conn.execute( "SELECT payload_json FROM event_log " "WHERE kind = 'meanwhile_digest_consumed' ORDER BY id" ).fetchall() assert len(consumed_rows) == 1 # The pending list is empty after consumption. from chat.state.meanwhile import list_pending_meanwhile_digests assert list_pending_meanwhile_digests(conn, "chat_bot_a") == [] # --------------------------------------------------------------------------- # Phase 3.5 (T82.2) — natural-language skip runs scene close detection. # # A user typing "fade out, skip an hour" should close the scene FIRST # (so the close summary captures the closing scene's final beat) and # THEN run the elision skip. Without this wiring, the skip dispatch # branch bypasses scene close entirely. # --------------------------------------------------------------------------- def test_natural_language_skip_with_close_signal_closes_scene( app_state_setup, tmp_path ): """Prose that hard-signals a close ("fade out, skip to morning") and parses as ``intent=skip_elision`` must: 1. Land a ``scene_closed`` event before any skip event. 2. Run ``apply_scene_close_summary`` for the closing scene. 3. Land a ``time_skip_elision`` event AFTER the scene_closed. Order matters — the scene_closed id must be lower than the time_skip_elision id in the event_log. Canned queue (single-bot, scene seeded, NO prior dialogue rows): 1. parse_turn -> intent=skip_elision 2. detect_scene_close -> should_close=True 3. apply_scene_close_summary host POV 4. narrate_skip narration detect_threads (T58.2 fires on every close) short-circuits when the scene-scoped transcript is empty — in this test no user/assistant turns landed in scene 1 before the close, so no thread-detection slot is needed. """ # Seed an open scene so detect_scene_close has something to act on. db_path = tmp_path / "test.db" with open_db(db_path) as conn: append_event( conn, kind="bot_authored", payload={ "id": "bot_a", "name": "BotA", "persona": "thoughtful, observant", "voice_samples": [], "traits": [], "backstory": "", "initial_relationship_to_you": "", "kickoff_prose": "...", }, ) append_event( conn, kind="chat_created", payload={ "id": "chat_bot_a", "host_bot_id": "bot_a", "initial_time": "2026-04-26T20:00:00+00:00", "narrative_anchor": "Day 1", "weather": "", }, ) append_event( conn, kind="container_created", payload={ "chat_id": "chat_bot_a", "name": "office", "type": "workplace", "properties": {}, }, ) append_event( conn, kind="scene_opened", payload={ "chat_id": "chat_bot_a", "container_id": 1, "started_at": "2026-04-26T20:00:00+00:00", "participants": ["you", "bot_a"], }, ) append_event( conn, kind="edge_update", payload={ "source_id": "bot_a", "target_id": "you", "chat_id": "chat_bot_a", "knowledge_facts": ["coworker"], }, ) for entity_id, verb in [("you", "talking"), ("bot_a", "listening")]: append_event( conn, kind="activity_change", payload={ "entity_id": entity_id, "posture": "sitting", "action": { "verb": verb, "interruptible": True, "required_attention": "low", "expected_duration": "ongoing", }, "attention": "", "holding": [], "status": {}, }, ) project(conn) canned_parse = json.dumps( { "segments": [ {"kind": "narration", "text": "fade out, skip to morning"} ], "intent": "skip_elision", "landing_state_hint": "morning at home", } ) canned_close = json.dumps( {"should_close": True, "reason": "fade out signaled"} ) canned_pov = json.dumps( { "summary": "BotA noticed the day winding down.", "knowledge_facts": [], "relationship_summary": "warmer", } ) canned_narration = "The night fades and morning arrives." mock = _override_llm( [ canned_parse, canned_close, canned_pov, canned_narration, ] ) try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "fade out, skip to morning"}, ) assert response.status_code == 204 finally: app.dependency_overrides.clear() # All 4 canned slots drained — close + skip both ran end-to-end. assert mock._canned == [] with open_db(db_path) as conn: # scene_closed and time_skip_elision both landed. scene_close_rows = conn.execute( "SELECT id FROM event_log WHERE kind = 'scene_closed'" ).fetchall() skip_rows = conn.execute( "SELECT id FROM event_log WHERE kind = 'time_skip_elision'" ).fetchall() assert len(scene_close_rows) == 1, "scene_closed must land" assert len(skip_rows) == 1, "time_skip_elision must land" # Order: scene close first, then skip. assert scene_close_rows[0][0] < skip_rows[0][0], ( "scene_closed must precede time_skip_elision in the event_log" )