diff --git a/chat/services/regenerate.py b/chat/services/regenerate.py index c92c0f0..eff02a2 100644 --- a/chat/services/regenerate.py +++ b/chat/services/regenerate.py @@ -26,6 +26,22 @@ Phase 1 simplifications (per the plan's "bound it" guidance): so affinity/trust/knowledge reflect the new output. - The route does not broadcast a fresh ``turn_html`` SSE event; T34 polishes UI swaps. The user refreshes the page to see the new turn. + +Phase 2 changes (T44): + +- Multi-entity prompt assembly: ``guest_id`` is forwarded to the + prompt assembler so the regenerated narrative sees the same + guest-aware context the original turn did. +- Multi-witness memory write: ``record_turn_memory_for_present`` fans + out one ``memory_written`` event per witness when a guest is present. +- Multi-pair state-update: ``compute_state_updates_for_present`` emits + one ``edge_update`` per directed pair across present entities. With + three present that's six edges instead of two. +- Interjection regeneration is **deferred to Phase 2.5**. Regenerate + only re-streams the addressee turn for v2; ``detect_interjection`` + is not invoked here. If the prior turn fired an interjection it + remains attached to the original assistant_turn (which is superseded + alongside the regenerated turn) — Phase 2.5 will revisit. """ from __future__ import annotations @@ -35,9 +51,9 @@ from sqlite3 import Connection from chat.config import Settings from chat.eventlog.log import append_and_apply, append_event -from chat.services.memory_write import record_turn_memory +from chat.services.memory_write import record_turn_memory_for_present +from chat.services.multi_state_update import compute_state_updates_for_present from chat.services.prompt import assemble_narrative_prompt -from chat.services.state_update import compute_state_update from chat.state.edges import get_edge from chat.state.entities import get_bot, get_you from chat.state.world import active_scene, get_chat @@ -72,6 +88,16 @@ async def regenerate_assistant_turn( "persona": "", } + # Phase 2: surface the guest (if any) so the prompt assembler and + # downstream multi-entity passes see the same shape post_turn does. + guest_bot_id = chat.get("guest_bot_id") + guest_bot: dict | None = None + if guest_bot_id is not None: + guest_bot = get_bot(conn, guest_bot_id) + if guest_bot is None: + # Stale guest reference — degrade to single-bot regenerate. + guest_bot_id = None + # 1. Locate the original assistant_turn event. row = conn.execute( "SELECT payload_json FROM event_log " @@ -82,6 +108,17 @@ async def regenerate_assistant_turn( raise ValueError("assistant_turn event not found") original_assistant_payload = json.loads(row[0]) original_user_turn_id = original_assistant_payload.get("user_turn_id") + # Phase 2 v2 regenerates only the addressee turn — preserve whichever + # bot the original turn was attributed to, falling back to the host + # for legacy rows that pre-date multi-entity support. + speaker_bot_id = original_assistant_payload.get("speaker_id") or host_bot_id + if speaker_bot_id == host_bot_id: + speaker_bot = host_bot + elif guest_bot is not None and speaker_bot_id == guest_bot.get("id"): + speaker_bot = guest_bot + else: + speaker_bot = get_bot(conn, speaker_bot_id) or host_bot + speaker_bot_id = speaker_bot.get("id", host_bot_id) # 2. Determine the prose for the new prompt and (when edited) capture # the user_turn_edit event up front so the new event ids exist before @@ -137,20 +174,26 @@ async def regenerate_assistant_turn( if kind in ("user_turn", "user_turn_edit"): recent.append({"speaker": you_name, "text": p.get("prose", "")}) else: - recent.append( - {"speaker": host_bot.get("name", "bot"), "text": p.get("text", "")} - ) + spk = p.get("speaker_id", "bot") + spk_name = host_bot.get("name", "bot") + if spk == host_bot_id: + spk_name = host_bot.get("name", "bot") + elif guest_bot is not None and spk == guest_bot.get("id"): + spk_name = guest_bot.get("name", "bot") + recent.append({"speaker": spk_name, "text": p.get("text", "")}) # 4. Assemble the narrative prompt. ``recent`` already excludes the # current user prose, which we pass through ``user_turn_prose``. + # Phase 2: forward ``guest_id`` so the prompt sees the third party. messages = assemble_narrative_prompt( conn, chat_id=chat_id, - speaker_bot_id=host_bot_id, + speaker_bot_id=speaker_bot_id, user_turn_prose=prose_for_prompt or None, recent_dialogue=recent, budget_soft=settings.narrative_budget_soft, budget_hard=settings.narrative_budget_hard, + guest_id=guest_bot_id, ) # 5. Stream the new narrative. @@ -164,7 +207,7 @@ async def regenerate_assistant_turn( accumulated.append(chunk) await publish( chat_id, - {"event": "token", "text": chunk, "speaker_id": host_bot_id}, + {"event": "token", "text": chunk, "speaker_id": speaker_bot_id}, ) new_text = "".join(accumulated) @@ -177,7 +220,7 @@ async def regenerate_assistant_turn( kind="assistant_turn", payload={ "chat_id": chat_id, - "speaker_id": host_bot_id, + "speaker_id": speaker_bot_id, "text": new_text, "truncated": False, "user_turn_id": ( @@ -196,88 +239,83 @@ async def regenerate_assistant_turn( ) # 8. Re-run downstream classifier passes (memory write + state update - # for both directed edges). Significance is intentionally skipped on - # regenerate (the prior score remains attached to the prior memory). + # for every directed pair across present entities). Significance is + # intentionally skipped on regenerate (the prior score remains + # attached to the prior memory). Phase 2.5 will add interjection + # regeneration; v2 leaves any prior interjection beat in place. scene = active_scene(conn, chat_id) - record_turn_memory( + record_turn_memory_for_present( conn, chat_id=chat_id, host_bot_id=host_bot_id, + guest_bot_id=guest_bot_id, narrative_text=new_text, scene_id=scene["id"] if scene else None, chat_clock_at=chat.get("time"), ) last_at = chat.get("time") + speaker_name = ( + speaker_bot.get("name", "bot") if speaker_bot is not None else "bot" + ) recent_for_update = recent + [ - {"speaker": host_bot.get("name", "bot"), "text": new_text} + {"speaker": speaker_name, "text": new_text} ] - edge_b2y = get_edge(conn, host_bot_id, "you") or { - "affinity": 50, - "trust": 50, - "summary": "", + # Build present-entity inputs for the multi-pair state-update pass. + # Host first preserves the Phase 1 directed-pair order (host->you, + # then you->host) so existing canned-response fixtures still line up. + present_ids: list[str] = [host_bot_id, "you"] + present_names: dict[str, str] = { + host_bot_id: host_bot.get("name", "bot"), + "you": you_name, } - update_b2y = await compute_state_update( - client, - model=settings.classifier_model, - source_id=host_bot_id, - target_id="you", - source_name=host_bot.get("name", "bot"), - source_persona=host_bot.get("persona", "") or "", - target_name=you_name, - prior_affinity=edge_b2y["affinity"], - prior_trust=edge_b2y["trust"], - prior_summary=edge_b2y.get("summary", "") or "", - recent_dialogue=recent_for_update, - ) - append_and_apply( - conn, - kind="edge_update", - payload={ - "source_id": host_bot_id, - "target_id": "you", - "chat_id": chat_id, - "affinity_delta": update_b2y.affinity_delta, - "trust_delta": update_b2y.trust_delta, - "knowledge_facts": update_b2y.knowledge_facts, - "last_interaction_at": last_at, - "last_interaction_chat_id": chat_id, - }, - ) + personas: dict[str, str] = { + host_bot_id: host_bot.get("persona") or "", + "you": you_entity.get("persona") or "", + } + if guest_bot is not None and guest_bot_id is not None: + present_ids.append(guest_bot_id) + present_names[guest_bot_id] = guest_bot.get("name", "bot") + personas[guest_bot_id] = guest_bot.get("persona") or "" - edge_y2b = get_edge(conn, "you", host_bot_id) or { - "affinity": 50, - "trust": 50, - "summary": "", - } - update_y2b = await compute_state_update( + prior_edges: dict[tuple[str, str], dict] = {} + for src in present_ids: + for tgt in present_ids: + if src == tgt: + continue + edge = get_edge(conn, src, tgt) or { + "affinity": 50, + "trust": 50, + "summary": "", + } + prior_edges[(src, tgt)] = edge + + state_updates = await compute_state_updates_for_present( client, - model=settings.classifier_model, - source_id="you", - target_id=host_bot_id, - source_name=you_name, - source_persona=you_entity.get("persona", "") or "", - target_name=host_bot.get("name", "bot"), - prior_affinity=edge_y2b["affinity"], - prior_trust=edge_y2b["trust"], - prior_summary=edge_y2b.get("summary", "") or "", + classifier_model=settings.classifier_model, + present_ids=present_ids, + present_names=present_names, + personas=personas, + prior_edges=prior_edges, recent_dialogue=recent_for_update, + timeout_s=settings.classifier_timeout_s, ) - append_and_apply( - conn, - kind="edge_update", - payload={ - "source_id": "you", - "target_id": host_bot_id, - "chat_id": chat_id, - "affinity_delta": update_y2b.affinity_delta, - "trust_delta": update_y2b.trust_delta, - "knowledge_facts": update_y2b.knowledge_facts, - "last_interaction_at": last_at, - "last_interaction_chat_id": chat_id, - }, - ) + for src_id, tgt_id, update in state_updates: + append_and_apply( + conn, + kind="edge_update", + payload={ + "source_id": src_id, + "target_id": tgt_id, + "chat_id": chat_id, + "affinity_delta": update.affinity_delta, + "trust_delta": update.trust_delta, + "knowledge_facts": update.knowledge_facts, + "last_interaction_at": last_at, + "last_interaction_chat_id": chat_id, + }, + ) return new_text diff --git a/chat/web/turns.py b/chat/web/turns.py index 894dc72..940afbf 100644 --- a/chat/web/turns.py +++ b/chat/web/turns.py @@ -1,32 +1,47 @@ """POST ``/chats//turns`` — narrative turn flow with SSE streaming. The turn flow strings together the pieces built in T17 (turn parser), T18 -(prompt assembler), and T16 (SSE channel): +(prompt assembler), and T16 (SSE channel). Phase 2 (T44) extends it to +multi-entity scenes with optional guest support and a follow-on +interjection beat. 1. Parse the user's prose with the classifier into typed segments. 2. Append a ``user_turn`` event capturing both the original prose and the parsed segments. 3. Append a placeholder ``assistant_turn_started`` marker so observers know a response is in flight. -4. Build the narrative prompt, dropping OOC segments before they reach the - bot (per Requirements §6.1 the OOC convention is for the author to talk - to the system, not to the in-fiction bot). -5. Stream tokens from the LLM, broadcasting each chunk over the chat's SSE +4. Detect the addressee (host vs. guest) from the prose using a simple + word-boundary substring match — see :func:`_detect_addressee_id`. +5. Build the narrative prompt for the addressee, dropping OOC segments + before they reach the bot (per Requirements §6.1 the OOC convention is + for the author to talk to the system, not to the in-fiction bot). +6. Stream tokens from the LLM, broadcasting each chunk over the chat's SSE channel as a ``token`` event so any subscribed browser tab sees them arrive in real time. -6. On stream complete, append an ``assistant_turn`` event with the full +7. On stream complete, append an ``assistant_turn`` event with the full text and ``truncated=False``. Then run a post-turn state-update pass (Requirements §3.4): one classifier call per directed edge between present entities, each producing an ``edge_update`` event with - affinity/trust/knowledge deltas. Finally publish a ``turn_html`` - event with a ready-to-swap HTML fragment so HTMX's SSE extension can - append it to the timeline without a page reload. -7. Return ``204 No Content`` — the SSE channel is the real conveyor of - state, not the POST response body. + affinity/trust/knowledge deltas. +8. When a guest is present, run the interjection classifier (§6.2). If it + fires we stream a second narrative as the silent witness, append a + second ``assistant_turn`` event linked to the same ``user_turn_id``, + and re-run memory + state-update for the interjector. The same + in-flight task covers both halves so cancel collapses both. +9. Scene-close detection runs after the (primary + optional interjection) + beats land so the close summary sees the full closing scene. T45's + guest-aware ``apply_scene_close_summary`` writes per-POV summaries for + each present witness. +10. Publish a ``turn_html`` event for each turn so HTMX's SSE extension + can append it to the timeline without a page reload. +11. Return ``204 No Content`` — the SSE channel is the real conveyor of + state, not the POST response body. Errors during streaming flip the assistant_turn's ``truncated`` flag to ``True`` and we still commit what we received. ``asyncio.CancelledError`` is treated identically and re-raised after recording the partial turn. +A cancellation mid-interjection skips the interjector's state/memory +follow-up so we don't run classifiers against a half-formed beat. """ from __future__ import annotations @@ -34,18 +49,20 @@ from __future__ import annotations import asyncio import html import json +import re from fastapi import APIRouter, Depends, Form, HTTPException, Request from fastapi.responses import HTMLResponse, RedirectResponse, Response from chat.eventlog.log import append_and_apply, append_event from chat.services.background import SignificanceJob -from chat.services.memory_write import record_turn_memory +from chat.services.interjection import detect_interjection +from chat.services.memory_write import record_turn_memory_for_present +from chat.services.multi_state_update import compute_state_updates_for_present from chat.services.prompt import assemble_narrative_prompt from chat.services.rewind import compute_rewind_preview, execute_rewind from chat.services.scene_close import detect_scene_close from chat.services.scene_summarize import apply_scene_close_summary -from chat.services.state_update import compute_state_update from chat.services.turn_parse import ParsedTurn, parse_turn from chat.state.edges import get_edge from chat.state.entities import get_bot, get_you @@ -114,6 +131,84 @@ def _read_recent_dialogue(conn, chat_id: str, limit: int = 200) -> list[dict]: return out +def _detect_addressee_id( + prose: str, host_bot: dict, guest_bot: dict | None +) -> str: + """Return the bot id of the addressee for ``prose``. + + Phase 2 v1 uses a simple case-insensitive whole-word match. The host + is the default — addressee flips to guest only when the guest's name + appears in the prose AND the host's does not. If both names match + or neither matches, the host keeps the floor. This bias keeps the + primary speaker stable across ambiguous prose; the interjection + branch (later in the turn flow) is how the silent witness gets a word + in edgewise when warranted. + """ + if guest_bot is None: + return host_bot["id"] + host_name = host_bot.get("name") or "" + guest_name = guest_bot.get("name") or "" + host_match = bool( + host_name + and re.search(rf"\b{re.escape(host_name)}\b", prose, re.IGNORECASE) + ) + guest_match = bool( + guest_name + and re.search(rf"\b{re.escape(guest_name)}\b", prose, re.IGNORECASE) + ) + if guest_match and not host_match: + return guest_bot["id"] + return host_bot["id"] + + +def _gather_state_update_inputs( + conn, + *, + host_bot: dict, + guest_bot: dict | None, + you_entity: dict, +) -> tuple[list[str], dict[str, str], dict[str, str], dict[tuple[str, str], dict]]: + """Collect ``(present_ids, present_names, personas, prior_edges)`` for + a multi-entity state-update pass. + + Phase 2 v1 always pairs ``you`` with the host and (when present) the + guest. ``prior_edges`` falls back to the schema default 50/50 baseline + when no row exists yet — that mirrors the Phase 1 single-pair flow. + + Order matters: the host comes first so the directed-pair iteration + in :func:`compute_state_updates_for_present` matches the Phase 1 + sequence (host->you, then you->host). Existing tests pin the canned- + response queue to that order — keeping it stable means we don't + have to reshuffle test fixtures across the Phase 2 cutover. + """ + present_ids: list[str] = [host_bot["id"], "you"] + present_names: dict[str, str] = { + host_bot["id"]: host_bot["name"], + "you": you_entity.get("name") or "you", + } + personas: dict[str, str] = { + host_bot["id"]: host_bot.get("persona") or "", + "you": you_entity.get("persona") or "", + } + if guest_bot is not None: + present_ids.append(guest_bot["id"]) + present_names[guest_bot["id"]] = guest_bot["name"] + personas[guest_bot["id"]] = guest_bot.get("persona") or "" + + prior_edges: dict[tuple[str, str], dict] = {} + for src in present_ids: + for tgt in present_ids: + if src == tgt: + continue + edge = get_edge(conn, src, tgt) or { + "affinity": 50, + "trust": 50, + "summary": "", + } + prior_edges[(src, tgt)] = edge + return present_ids, present_names, personas, prior_edges + + @router.post("/chats/{chat_id}/turns") async def post_turn( chat_id: str, @@ -137,6 +232,15 @@ async def post_turn( detail=f"host bot not found: {chat['host_bot_id']}", ) + guest_bot = None + guest_bot_id = chat.get("guest_bot_id") + if guest_bot_id is not None: + guest_bot = get_bot(conn, guest_bot_id) + # If the chat references a deleted guest we degrade to single-bot + # rather than 404 — the chat is still usable as a 1:1. + if guest_bot is None: + guest_bot_id = None + settings = request.app.state.settings # 1. Parse turn (classifier). @@ -156,7 +260,16 @@ async def post_turn( }, ) - # 3. Append assistant_turn_started placeholder. ``user_turn``, + # 3. Determine the addressee. Done before assistant_turn_started so the + # placeholder reflects the bot the user is actually talking to (host + # in 1:1, host-or-guest in multi-entity). + addressee_id = _detect_addressee_id(prose, host_bot, guest_bot) + addressee_bot = ( + guest_bot if (guest_bot is not None and addressee_id == guest_bot["id"]) + else host_bot + ) + + # 4. Append assistant_turn_started placeholder. ``user_turn``, # ``assistant_turn_started``, and ``assistant_turn`` have no registered # projector handlers — they live in the event_log purely for transcript # rendering — so we don't call ``project`` here. (Re-projecting now would @@ -166,12 +279,15 @@ async def post_turn( kind="assistant_turn_started", payload={ "chat_id": chat_id, - "speaker_id": host_bot["id"], + "speaker_id": addressee_bot["id"], "user_turn_id": user_turn_event_id, }, ) - # 4. Build the narrative prompt. + # 5. Build the narrative prompt for the addressee. ``guest_id`` is + # passed explicitly so the prompt assembler renders the guest's + # activity / group-node block when applicable. The assembler is + # tolerant of ``guest_id is None`` so this is a no-op for 1:1 chats. recent = _read_recent_dialogue(conn, chat_id, limit=20) # Drop the just-appended user turn from ``recent`` — it's passed as # ``user_turn_prose`` to the assembler and would otherwise duplicate. @@ -180,189 +296,327 @@ async def post_turn( messages = assemble_narrative_prompt( conn, chat_id=chat_id, - speaker_bot_id=host_bot["id"], + speaker_bot_id=addressee_bot["id"], user_turn_prose=prompt_prose if prompt_prose else None, recent_dialogue=recent, budget_soft=settings.narrative_budget_soft, budget_hard=settings.narrative_budget_hard, + guest_id=guest_bot_id, ) - # 5. Stream and accumulate tokens. The stream runs as a Task so the + # 6. Stream and accumulate tokens. The stream runs as a Task so the # /turns/cancel route can invoke ``Task.cancel()`` to abort it # mid-stream. ``accumulated`` is a closure over the inner coroutine, # so when the await on ``stream_task`` raises CancelledError below # we still see whatever tokens were appended before cancellation. - accumulated: list[str] = [] - truncated = False + primary_accumulated: list[str] = [] + primary_truncated = False cancelled = False - async def _stream() -> None: + async def _stream_primary() -> None: async for chunk in client.stream( messages, model=settings.narrative_model, max_tokens=settings.narrative_max_tokens, temperature=settings.narrative_temperature, ): - accumulated.append(chunk) + primary_accumulated.append(chunk) await publish( chat_id, { "event": "token", "text": chunk, - "speaker_id": host_bot["id"], + "speaker_id": addressee_bot["id"], }, ) - stream_task = asyncio.create_task(_stream()) + stream_task = asyncio.create_task(_stream_primary()) _in_flight_tasks[chat_id] = stream_task try: await stream_task except asyncio.CancelledError: # Preserve the partial output before letting the cancellation # propagate so the transcript reflects what the user actually saw. - truncated = True + primary_truncated = True cancelled = True except Exception: # Surface as a truncated turn rather than losing the partial output. - truncated = True + primary_truncated = True finally: # Always unregister so a subsequent turn can register a fresh task. _in_flight_tasks.pop(chat_id, None) - full_text = "".join(accumulated) + primary_text = "".join(primary_accumulated) - # 6. Append the assistant_turn with the final text. (See note above on + # 7. Append the assistant_turn with the final text. (See note above on # why we skip ``project`` for these transcript-only event kinds.) append_event( conn, kind="assistant_turn", payload={ "chat_id": chat_id, - "speaker_id": host_bot["id"], - "text": full_text, - "truncated": truncated, + "speaker_id": addressee_bot["id"], + "text": primary_text, + "truncated": primary_truncated, "user_turn_id": user_turn_event_id, }, ) - # 6a. Per-turn memory write (Plan §11.1, T21). Phase 1 single-bot: - # only the host bot has a memory store, witness flags are - # ``[you=1, host=1, guest=0]``, and ``pov_summary`` is the raw - # narrative text (T27 will rewrite at scene close). Significance - # defaults to 1; T22's async classifier pass will overwrite it. + # 7a. Per-turn memory write (Plan §11.1, T21 / T41). With a guest + # present this fans out to one ``memory_written`` event per witness + # (host + guest); without a guest it preserves the Phase 1 single + # write keyed on the host. Witness flags are set inside the helper. scene = active_scene(conn, chat_id) - _event_id, memory_id = record_turn_memory( + memory_results = record_turn_memory_for_present( conn, chat_id=chat_id, host_bot_id=host_bot["id"], - narrative_text=full_text, + guest_bot_id=guest_bot_id, + narrative_text=primary_text, scene_id=scene["id"] if scene else None, chat_clock_at=chat.get("time"), ) - # 6b. Post-turn state-update pass (Requirements §3.4). For Phase 1 - # the only present entities are ``you`` and ``host_bot`` so we run - # two classifier calls — one per directed edge — and append the - # resulting ``edge_update`` events. The recent-dialogue slice is - # re-read here so the pass sees the just-appended assistant turn. - # We use ``append_and_apply`` (vs append + project) because the - # edge_update handler is *not* replay-safe: re-projecting prior - # events would re-apply their deltas on top of the live row. - recent_for_update = _read_recent_dialogue(conn, chat_id, limit=10) + # 7b. Post-turn state-update pass (Requirements §3.4 / T40). All + # directed pairs over the present entities — 2 pairs for 1:1, 6 for + # 3-entity scenes. Run sequentially via the inner helper which honors + # the Featherless 2-conn cap. you_entity = get_you(conn) or {"name": "you", "persona": ""} last_at = chat.get("time") + recent_for_update = _read_recent_dialogue(conn, chat_id, limit=10) - edge_b2y = get_edge(conn, host_bot["id"], "you") or { - "affinity": 50, - "trust": 50, - "summary": "", - } - update_b2y = await compute_state_update( + present_ids, present_names, personas, prior_edges = ( + _gather_state_update_inputs( + conn, + host_bot=host_bot, + guest_bot=guest_bot, + you_entity=you_entity, + ) + ) + + state_updates = await compute_state_updates_for_present( client, - model=settings.classifier_model, - source_id=host_bot["id"], - target_id="you", - source_name=host_bot["name"], - source_persona=host_bot.get("persona", ""), - target_name=you_entity.get("name", "you"), - prior_affinity=edge_b2y["affinity"], - prior_trust=edge_b2y["trust"], - prior_summary=edge_b2y.get("summary", "") or "", + classifier_model=settings.classifier_model, + present_ids=present_ids, + present_names=present_names, + personas=personas, + prior_edges=prior_edges, recent_dialogue=recent_for_update, + timeout_s=settings.classifier_timeout_s, ) - append_and_apply( - conn, - kind="edge_update", - payload={ - "source_id": host_bot["id"], - "target_id": "you", - "chat_id": chat_id, - "affinity_delta": update_b2y.affinity_delta, - "trust_delta": update_b2y.trust_delta, - "knowledge_facts": update_b2y.knowledge_facts, - "last_interaction_at": last_at, - "last_interaction_chat_id": chat_id, - }, - ) + for src_id, tgt_id, update in state_updates: + append_and_apply( + conn, + kind="edge_update", + payload={ + "source_id": src_id, + "target_id": tgt_id, + "chat_id": chat_id, + "affinity_delta": update.affinity_delta, + "trust_delta": update.trust_delta, + "knowledge_facts": update.knowledge_facts, + "last_interaction_at": last_at, + "last_interaction_chat_id": chat_id, + }, + ) - edge_y2b = get_edge(conn, "you", host_bot["id"]) or { - "affinity": 50, - "trust": 50, - "summary": "", - } - update_y2b = await compute_state_update( - client, - model=settings.classifier_model, - source_id="you", - target_id=host_bot["id"], - source_name=you_entity.get("name", "you"), - source_persona=you_entity.get("persona", "") or "", - target_name=host_bot["name"], - prior_affinity=edge_y2b["affinity"], - prior_trust=edge_y2b["trust"], - prior_summary=edge_y2b.get("summary", "") or "", - recent_dialogue=recent_for_update, - ) - append_and_apply( - conn, - kind="edge_update", - payload={ - "source_id": "you", - "target_id": host_bot["id"], - "chat_id": chat_id, - "affinity_delta": update_y2b.affinity_delta, - "trust_delta": update_y2b.trust_delta, - "knowledge_facts": update_y2b.knowledge_facts, - "last_interaction_at": last_at, - "last_interaction_chat_id": chat_id, - }, - ) - - # 6c. Enqueue the async significance pass (Plan §11.1, T22). The + # 7c. Enqueue the async significance pass (Plan §11.1, T22). The # worker scores the just-written memory 0-3, updates significance, # and auto-pins on score 3 with the §8.5 soft-cap eviction rule. - # Enqueued before the broadcast so it's outstanding by the time the - # client sees ``turn_html`` — but the worker is async, so the user - # never blocks on it. + # Phase 2 picks the host's memory id as the canonical input — guest + # POV memories piggyback on the same significance score (the prose + # they record is identical for v2; per-POV rewrite happens at scene + # close in T45 and downstream-of-significance). worker = getattr(request.app.state, "background_worker", None) - if worker is not None and memory_id is not None: + host_event_memory = memory_results.get(host_bot["id"]) + host_memory_id = host_event_memory[1] if host_event_memory else None + if worker is not None and host_memory_id is not None: worker.enqueue( SignificanceJob( - memory_id=memory_id, - narrative_text=full_text, + memory_id=host_memory_id, + narrative_text=primary_text, prior_dialogue=recent_for_update, host_bot_id=host_bot["id"], ) ) - # 6d. Scene-close detection (Plan §7.2, T26). Runs AFTER assistant_turn - # so the bot's response is the closing scene's final beat — closing - # before narrative would force the bot to speak "in no scene", which - # is awkward. Hard signals only in Phase 1: container change parsed - # from prose, or explicit "fade out" / "we're done here" patterns. - # On classifier failure the service returns ``should_close=False`` - # so the turn flow keeps moving; the manual close button in the - # drawer is the always-available fallback. + # 8. Interjection branch (T39 / T44). Only fires when the chat has a + # guest AND the addressee was the bot we *can* interject for (i.e. + # not the lone bot in a 1:1 chat). The silent witness is whichever + # bot didn't get the addressee slot. We only run this when the + # primary stream actually completed — a cancelled or errored primary + # short-circuits the follow-on so we don't classifier-spam against a + # half-formed beat. + interjection_text: str | None = None + interjection_speaker_id: str | None = None + interjection_truncated = False + if ( + guest_bot is not None + and not cancelled + and not primary_truncated + and primary_text.strip() + ): + # Identify the silent witness — the bot that is NOT the addressee. + if addressee_id == host_bot["id"]: + silent_witness = guest_bot + else: + silent_witness = host_bot + + edge_w_to_addr = get_edge( + conn, silent_witness["id"], addressee_bot["id"] + ) or {"affinity": 50, "trust": 50, "summary": ""} + edge_w_to_you = get_edge(conn, silent_witness["id"], "you") or { + "affinity": 50, + "trust": 50, + "summary": "", + } + + decision = await detect_interjection( + client, + classifier_model=settings.classifier_model, + addressee_name=addressee_bot["name"], + addressee_just_said=primary_text, + silent_witness_name=silent_witness["name"], + silent_witness_persona=silent_witness.get("persona") or "", + silent_witness_edge_to_addressee=edge_w_to_addr, + silent_witness_edge_to_you=edge_w_to_you, + you_just_said=prose, + timeout_s=settings.classifier_timeout_s, + ) + + if decision.should_interject: + interjection_speaker_id = silent_witness["id"] + + # Re-read recent_dialogue so the just-appended assistant_turn + # (the addressee's beat) is in the prompt context. + interject_recent = _read_recent_dialogue(conn, chat_id, limit=20) + if interject_recent and interject_recent[-1].get("speaker") == "you": + interject_recent = interject_recent[:-1] + interject_messages = assemble_narrative_prompt( + conn, + chat_id=chat_id, + speaker_bot_id=silent_witness["id"], + addressee=addressee_bot["id"], + user_turn_prose=prompt_prose if prompt_prose else None, + recent_dialogue=interject_recent, + budget_soft=settings.narrative_budget_soft, + budget_hard=settings.narrative_budget_hard, + guest_id=guest_bot_id, + ) + + interject_accumulated: list[str] = [] + + async def _stream_interjection() -> None: + async for chunk in client.stream( + interject_messages, + model=settings.narrative_model, + max_tokens=settings.narrative_max_tokens, + temperature=settings.narrative_temperature, + ): + interject_accumulated.append(chunk) + await publish( + chat_id, + { + "event": "token", + "text": chunk, + "speaker_id": silent_witness["id"], + }, + ) + + interject_task = asyncio.create_task(_stream_interjection()) + _in_flight_tasks[chat_id] = interject_task + try: + await interject_task + except asyncio.CancelledError: + interjection_truncated = True + cancelled = True + except Exception: + interjection_truncated = True + finally: + _in_flight_tasks.pop(chat_id, None) + + interjection_text = "".join(interject_accumulated) + + append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": chat_id, + "speaker_id": silent_witness["id"], + "text": interjection_text, + "truncated": interjection_truncated, + "user_turn_id": user_turn_event_id, + "interjection_of": addressee_bot["id"], + }, + ) + + # Skip the downstream classifier passes if the interjection + # was cancelled mid-stream — we don't want to score a partial + # beat the user never got to read in full. + if not interjection_truncated: + # Re-run the multi-pair state update — the interjector + # adding their voice plausibly shifts edges for everyone + # in the room. Idempotent enough for v2 (deltas accumulate; + # no stale state). Re-read recent so the just-appended + # interjection turn is in scope. + recent_post_interject = _read_recent_dialogue( + conn, chat_id, limit=10 + ) + # Re-fetch prior edges so deltas land on the post-primary + # state rather than the pre-turn baseline. + _, _, _, prior_edges_post = _gather_state_update_inputs( + conn, + host_bot=host_bot, + guest_bot=guest_bot, + you_entity=you_entity, + ) + state_updates_post = await compute_state_updates_for_present( + client, + classifier_model=settings.classifier_model, + present_ids=present_ids, + present_names=present_names, + personas=personas, + prior_edges=prior_edges_post, + recent_dialogue=recent_post_interject, + timeout_s=settings.classifier_timeout_s, + ) + for src_id, tgt_id, update in state_updates_post: + append_and_apply( + conn, + kind="edge_update", + payload={ + "source_id": src_id, + "target_id": tgt_id, + "chat_id": chat_id, + "affinity_delta": update.affinity_delta, + "trust_delta": update.trust_delta, + "knowledge_facts": update.knowledge_facts, + "last_interaction_at": last_at, + "last_interaction_chat_id": chat_id, + }, + ) + + # Memory write for the interjection beat — a second pair + # of memory_written events (host + guest POVs). + record_turn_memory_for_present( + conn, + chat_id=chat_id, + host_bot_id=host_bot["id"], + guest_bot_id=guest_bot_id, + narrative_text=interjection_text, + scene_id=scene["id"] if scene else None, + chat_clock_at=chat.get("time"), + ) + + # 9. Scene-close detection (Plan §7.2, T26). Runs AFTER assistant_turn + # and the optional interjection so the bots' responses are part of + # the closing scene's final beat — closing before narrative would + # force the bot to speak "in no scene", which is awkward. Hard + # signals only in Phase 1: container change parsed from prose, or + # explicit "fade out" / "we're done here" patterns. On classifier + # failure the service returns ``should_close=False`` so the turn + # flow keeps moving; the manual close button in the drawer is the + # always-available fallback. # # Skip empty prose — no signal to classify and no point spending a # round-trip. Skip when there's no active scene (e.g. after a prior @@ -393,11 +647,12 @@ async def post_turn( "significance": 0, }, ) - # T27: per-POV summary + edge summary update + knowledge - # promotion. Runs synchronously after the close so the - # next turn (or a subsequent GET /chats/) sees the - # rewritten memories and edge summary. Tolerates classifier - # failure (returns the empty default and skips the writes). + # T27 / T45: per-POV summary + edge summary update + knowledge + # promotion for each present witness (host always; guest when + # present). Runs synchronously after the close so the next + # turn (or a subsequent GET /chats/) sees the rewritten + # memories and edge summaries. Tolerates classifier failure + # (returns the empty default and skips the writes). await apply_scene_close_summary( conn, client, @@ -408,24 +663,50 @@ async def post_turn( timeout_s=settings.classifier_timeout_s, ) - # 7. Broadcast a JSON completion event (for JS consumers) and an HTML - # fragment event (for HTMX SSE swap-into-timeline). + # 10. Broadcast a JSON completion event (for JS consumers) and an HTML + # fragment event (for HTMX SSE swap-into-timeline). One pair per + # written assistant_turn so the timeline ends up with both the + # primary and the interjection beat in the right order. await publish( chat_id, { "event": "assistant_turn_complete", - "speaker_id": host_bot["id"], - "text": full_text, - "truncated": truncated, + "speaker_id": addressee_bot["id"], + "text": primary_text, + "truncated": primary_truncated, }, ) - assistant_html = _render_turn_html( - host_bot["name"], full_text, role="bot" + primary_html = _render_turn_html( + addressee_bot["name"], primary_text, role="bot" ) await publish( - chat_id, {"event": "turn_html", "data": assistant_html} + chat_id, {"event": "turn_html", "data": primary_html} ) + if interjection_text is not None and interjection_speaker_id is not None: + # The interjector's display name is whichever bot wasn't the + # addressee — pull it from the in-scope variable directly. + interject_speaker_name = ( + host_bot["name"] + if interjection_speaker_id == host_bot["id"] + else (guest_bot["name"] if guest_bot is not None else "bot") + ) + await publish( + chat_id, + { + "event": "assistant_turn_complete", + "speaker_id": interjection_speaker_id, + "text": interjection_text, + "truncated": interjection_truncated, + }, + ) + interject_html = _render_turn_html( + interject_speaker_name, interjection_text, role="bot" + ) + await publish( + chat_id, {"event": "turn_html", "data": interject_html} + ) + if cancelled: # Re-raise after the partial-turn has been recorded. raise asyncio.CancelledError diff --git a/tests/test_turn_flow.py b/tests/test_turn_flow.py index 93f315f..7d04755 100644 --- a/tests/test_turn_flow.py +++ b/tests/test_turn_flow.py @@ -202,3 +202,487 @@ def test_get_chat_renders_existing_turns(client, tmp_path): body = response.text assert "hello" in body assert "Hi there." in body + + +# --------------------------------------------------------------------------- +# Phase 2 (T44) — multi-entity turn flow. +# +# These tests cover the post_turn flow when a guest is present: addressee +# detection, multi-pair state-update + multi-witness memory writes, and +# the optional interjection follow-on. Each test installs its own +# MockLLMClient with a canned-response queue tailored to the call shape +# of that scenario; the queue is documented at the top of each test so +# the orchestration is auditable. +# --------------------------------------------------------------------------- + + +def _bot_payload(bot_id: str, name: str, persona: str = "") -> dict: + return { + "id": bot_id, + "name": name, + "persona": persona or f"persona for {name}", + "voice_samples": [], + "traits": [], + "backstory": "", + "initial_relationship_to_you": "", + "kickoff_prose": "...", + } + + +def _seed_chat_with_guest(db_path: Path) -> None: + """Author host BotA + guest BotB, create a chat with both wired in, + and seed an open scene plus minimal activity rows so the prompt + assembler sees a third party. Edges are seeded for all six directed + pairs at the schema-default 50/50 baseline so multi-pair state + updates land cleanly.""" + with open_db(db_path) as conn: + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_b", "BotB")) + append_event( + conn, + kind="you_authored", + payload={"name": "Me", "pronouns": "they/them", "persona": ""}, + ) + append_event( + conn, + kind="chat_created", + payload={ + "id": "chat_bot_a", + "host_bot_id": "bot_a", + "guest_bot_id": "bot_b", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1", + "weather": "", + }, + ) + # Container + open scene so scene_close detection has something + # to act on in the per-POV summary test. + append_event( + conn, + kind="container_created", + payload={ + "chat_id": "chat_bot_a", + "name": "office", + "type": "workplace", + "properties": {}, + }, + ) + append_event( + conn, + kind="scene_opened", + payload={ + "chat_id": "chat_bot_a", + "container_id": 1, + "started_at": "2026-04-26T20:00:00+00:00", + "participants": ["you", "bot_a", "bot_b"], + }, + ) + # Seed all six directed edges so state-update writes land on + # initialized rows. Knowledge fact on bot_a -> you exercises + # the existing-fact preservation path. + for src, tgt, facts in [ + ("bot_a", "you", ["coworker"]), + ("you", "bot_a", []), + ("bot_b", "you", []), + ("you", "bot_b", []), + ("bot_a", "bot_b", []), + ("bot_b", "bot_a", []), + ]: + append_event( + conn, + kind="edge_update", + payload={ + "source_id": src, + "target_id": tgt, + "chat_id": "chat_bot_a", + "knowledge_facts": facts, + }, + ) + for entity_id, verb in [ + ("you", "talking"), + ("bot_a", "listening"), + ("bot_b", "listening"), + ]: + append_event( + conn, + kind="activity_change", + payload={ + "entity_id": entity_id, + "posture": "sitting", + "action": { + "verb": verb, + "interruptible": True, + "required_attention": "low", + "expected_duration": "ongoing", + }, + "attention": "", + "holding": [], + "status": {}, + }, + ) + project(conn) + + +def _override_llm(canned: list[str]) -> MockLLMClient: + """Wire a fresh ``MockLLMClient`` and return it so tests can introspect + the residual canned queue after the request.""" + from chat.web.kickoff import get_llm_client + + mock = MockLLMClient(canned=list(canned)) + app.dependency_overrides[get_llm_client] = lambda: mock + return mock + + +def _zero_state() -> str: + return json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + + +@pytest.fixture +def app_state_setup(tmp_path, monkeypatch): + """Same env wiring as the existing ``client`` fixture but without a + pre-installed MockLLMClient — the multi-entity tests pin their own + canned queues per scenario. + """ + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + db = tmp_path / "test.db" + monkeypatch.setenv("CHAT_DB_PATH", str(db)) + with TestClient(app) as c: + app.state.background_worker.enabled = False + yield c + app.dependency_overrides.clear() + + +def test_single_bot_turn_no_guest_regression(app_state_setup, tmp_path): + """No-guest regression: the canned-response queue remains parse + + narrative + 2 state-updates. Interjection is path-bypassed because + the chat has no guest, so ``detect_interjection`` is NOT invoked. + Ends with one user_turn, one assistant_turn, two edge_updates, and a + single ``memory_written``. + """ + _seed(tmp_path / "test.db") + canned_parse = json.dumps( + {"segments": [{"kind": "dialogue", "text": "hello"}]} + ) + mock = _override_llm( + [canned_parse, "Hi there.", _zero_state(), _zero_state()] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", data={"prose": "hello"} + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + + # No guest -> no interjection classifier call -> queue fully drained. + assert mock._canned == [] + + with open_db(tmp_path / "test.db") as conn: + cur = conn.execute( + "SELECT kind FROM event_log " + "WHERE kind IN ('user_turn', 'assistant_turn', 'edge_update', " + " 'memory_written') ORDER BY id" + ) + kinds = [r[0] for r in cur.fetchall()] + user_turns = [k for k in kinds if k == "user_turn"] + assistant_turns = [k for k in kinds if k == "assistant_turn"] + edge_updates_after_seed = [k for k in kinds if k == "edge_update"] + memory_writes = [k for k in kinds if k == "memory_written"] + assert len(user_turns) == 1 + assert len(assistant_turns) == 1 + # Seed adds exactly one edge_update (bot_a -> you); the post-turn + # pass adds two more for a total of three. + assert len(edge_updates_after_seed) == 3 + assert len(memory_writes) == 1 + + +def test_multi_bot_turn_no_interjection(app_state_setup, tmp_path): + """Chat has a guest; ``detect_interjection`` returns False. Verify: + 1 user_turn + 1 assistant_turn + 6 *post-turn* edge_updates + 2 + memory_written events. Single turn_html broadcast. + + Canned queue (8 calls): + 1. parse_turn + 2. narrative stream (primary, addressee = host because the prose + doesn't name the guest) + 3-8. 6 state-update calls (one per directed pair across {you, + bot_a, bot_b}) + 9. detect_interjection -> should_interject=False + 10. detect_scene_close -> should_close=False + """ + _seed_chat_with_guest(tmp_path / "test.db") + canned_parse = json.dumps( + {"segments": [{"kind": "dialogue", "text": "hello room"}]} + ) + canned = [ + canned_parse, + "Greetings.", + _zero_state(), _zero_state(), _zero_state(), + _zero_state(), _zero_state(), _zero_state(), + json.dumps({"should_interject": False, "reason": "calm"}), + json.dumps({"should_close": False, "reason": "no signal"}), + ] + mock = _override_llm(canned) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", data={"prose": "hello room"} + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + # All 10 canned slots should have been consumed. + assert mock._canned == [] + + with open_db(tmp_path / "test.db") as conn: + # Count post-turn edge_updates (i.e. those after the latest + # assistant_turn id). + max_at = conn.execute( + "SELECT MAX(id) FROM event_log WHERE kind = 'assistant_turn'" + ).fetchone()[0] + cur = conn.execute( + "SELECT COUNT(*) FROM event_log " + "WHERE kind = 'edge_update' AND id > ?", + (max_at,), + ) + post_turn_edge_updates = cur.fetchone()[0] + + cur = conn.execute( + "SELECT COUNT(*) FROM event_log WHERE kind = 'user_turn'" + ) + user_turn_count = cur.fetchone()[0] + cur = conn.execute( + "SELECT COUNT(*) FROM event_log WHERE kind = 'assistant_turn'" + ) + assistant_turn_count = cur.fetchone()[0] + cur = conn.execute( + "SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'" + ) + memory_count = cur.fetchone()[0] + + assert user_turn_count == 1 + assert assistant_turn_count == 1 + assert post_turn_edge_updates == 6 + assert memory_count == 2 + + +def test_multi_bot_turn_with_interjection(app_state_setup, tmp_path): + """Chat has a guest; ``detect_interjection`` returns True. Verify: + 1 user_turn + 2 assistant_turns + (6 + 6) post-turn edge_updates + + 4 memory_written events. + + Canned queue (16 calls): + 1. parse_turn + 2. narrative stream (primary) + 3-8. 6 state-update calls (post-primary) + 9. detect_interjection -> should_interject=True + 10. narrative stream (interjection) + 11-16. 6 state-update calls (post-interjection) + 17. detect_scene_close -> should_close=False + """ + _seed_chat_with_guest(tmp_path / "test.db") + canned_parse = json.dumps( + {"segments": [{"kind": "dialogue", "text": "tell me"}]} + ) + canned = [ + canned_parse, + "Primary beat.", + _zero_state(), _zero_state(), _zero_state(), + _zero_state(), _zero_state(), _zero_state(), + json.dumps({"should_interject": True, "reason": "jealous"}), + "Interjection beat!", + _zero_state(), _zero_state(), _zero_state(), + _zero_state(), _zero_state(), _zero_state(), + json.dumps({"should_close": False, "reason": "no signal"}), + ] + mock = _override_llm(canned) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", data={"prose": "tell me"} + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + with open_db(tmp_path / "test.db") as conn: + cur = conn.execute( + "SELECT COUNT(*) FROM event_log WHERE kind = 'assistant_turn'" + ) + assistant_count = cur.fetchone()[0] + cur = conn.execute( + "SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'" + ) + memory_count = cur.fetchone()[0] + # All edge_updates after the FIRST assistant_turn are post-turn. + first_at = conn.execute( + "SELECT MIN(id) FROM event_log WHERE kind = 'assistant_turn'" + ).fetchone()[0] + post_turn_edges = conn.execute( + "SELECT COUNT(*) FROM event_log " + "WHERE kind = 'edge_update' AND id > ?", + (first_at,), + ).fetchone()[0] + + # Both assistant_turn payloads should reference the same user_turn + # and the second one tags ``interjection_of`` the first speaker. + rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'assistant_turn' ORDER BY id" + ).fetchall() + first_payload = json.loads(rows[0][0]) + second_payload = json.loads(rows[1][0]) + + assert assistant_count == 2 + assert memory_count == 4 + assert post_turn_edges == 12 + assert first_payload["text"] == "Primary beat." + assert second_payload["text"] == "Interjection beat!" + # The silent witness is the bot that wasn't the primary addressee. + assert second_payload["interjection_of"] == first_payload["speaker_id"] + assert second_payload["speaker_id"] != first_payload["speaker_id"] + assert first_payload["user_turn_id"] == second_payload["user_turn_id"] + + +def test_multi_bot_turn_scene_close_writes_per_pov_summaries( + app_state_setup, tmp_path +): + """Chat has a guest, prose hard-signals a scene close, classifier + confirms. Verify a ``scene_closed`` event lands and per-POV summary + rewrites fire for both bots (memory.pov_summary changes for each). + Interjection short-circuits at False so the queue stays compact. + + Canned queue (12 calls): + 1. parse_turn + 2. narrative stream (primary) + 3-8. 6 state-update calls + 9. detect_interjection -> False (no follow-on stream) + 10. detect_scene_close -> True + 11. apply_scene_close_summary host POV + 12. apply_scene_close_summary guest POV + """ + _seed_chat_with_guest(tmp_path / "test.db") + canned_parse = json.dumps( + { + "segments": [ + {"kind": "narration", "text": "we are done here, fade out"} + ] + } + ) + pov_payload = json.dumps( + { + "summary": "BotA noticed the day winding down.", + "knowledge_facts": [], + "relationship_summary": "warmer", + } + ) + pov_payload_guest = json.dumps( + { + "summary": "BotB watched the scene close.", + "knowledge_facts": [], + "relationship_summary": "warmer", + } + ) + canned = [ + canned_parse, + "Goodnight.", + _zero_state(), _zero_state(), _zero_state(), + _zero_state(), _zero_state(), _zero_state(), + json.dumps({"should_interject": False, "reason": "calm"}), + json.dumps({"should_close": True, "reason": "fade out signaled"}), + pov_payload, + pov_payload_guest, + ] + mock = _override_llm(canned) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", data={"prose": "we are done here, fade out"} + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + with open_db(tmp_path / "test.db") as conn: + cur = conn.execute( + "SELECT COUNT(*) FROM event_log WHERE kind = 'scene_closed'" + ) + scene_close_count = cur.fetchone()[0] + # One memory_pov_summary manual_edit per witness. + cur = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'manual_edit'" + ) + manual_edits = [json.loads(r[0]) for r in cur.fetchall()] + pov_edits = [ + e for e in manual_edits + if e.get("target_kind") == "memory_pov_summary" + ] + # After the rewrite, bot_a's scene-1 memory carries the host POV + # and bot_b's scene-1 memory carries the guest POV. + host_pov = conn.execute( + "SELECT pov_summary FROM memories WHERE owner_id = ? AND scene_id = 1", + ("bot_a",), + ).fetchone() + guest_pov = conn.execute( + "SELECT pov_summary FROM memories WHERE owner_id = ? AND scene_id = 1", + ("bot_b",), + ).fetchone() + + assert scene_close_count == 1 + # Two memory rewrites — one per witness. + assert len(pov_edits) == 2 + assert host_pov is not None and "BotA noticed" in host_pov[0] + assert guest_pov is not None and "BotB watched" in guest_pov[0] + + +def test_addressee_detection_routes_to_named_bot(app_state_setup, tmp_path): + """Prose that names the guest by name routes the primary turn to the + guest. Interjection (when fired) makes the host the silent witness + and the second assistant_turn carries the host as speaker. + + Canned queue: same shape as the with-interjection test (16 calls) + plus the trailing scene_close decision. + """ + _seed_chat_with_guest(tmp_path / "test.db") + canned_parse = json.dumps( + {"segments": [{"kind": "dialogue", "text": "BotB, what do you think?"}]} + ) + canned = [ + canned_parse, + "BotB pondering.", + _zero_state(), _zero_state(), _zero_state(), + _zero_state(), _zero_state(), _zero_state(), + json.dumps({"should_interject": True, "reason": "host wants in"}), + "BotA chiming in.", + _zero_state(), _zero_state(), _zero_state(), + _zero_state(), _zero_state(), _zero_state(), + json.dumps({"should_close": False, "reason": "no signal"}), + ] + mock = _override_llm(canned) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "BotB, what do you think?"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + with open_db(tmp_path / "test.db") as conn: + rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'assistant_turn' ORDER BY id" + ).fetchall() + primary_payload = json.loads(rows[0][0]) + interjection_payload = json.loads(rows[1][0]) + + # Primary speaker is the guest because the prose names BotB and not + # BotA (case-insensitive whole-word match). + assert primary_payload["speaker_id"] == "bot_b" + # Interjection follow-on goes to the silent witness — the host. + assert interjection_payload["speaker_id"] == "bot_a" + assert interjection_payload["interjection_of"] == "bot_b"