diff --git a/CLAUDE.md b/CLAUDE.md index 9ac8550..8d80cd5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -204,15 +204,7 @@ Phase 2.5 cleanup shipped end-to-end across 8 tasks (T68–T75). Two CLAUDE.md b ### Phase 2.6 / 3 backlog -New follow-ups discovered during Phase 2.5 execution. None are blocking; pick up at any time. - -- **Frontend handler for `turn_html_replace` SSE event (from T73.1 review)**: regenerate's backend broadcast lands, but no live tab swaps the regenerated turn until a JS handler is wired. The existing `turn_html` event uses HTMX `sse-swap` to append; `turn_html_replace` ships JSON with `supersedes_id` for replacement semantics. Phase 2.6 should wire the JS to swap the prior turn's DOM node in place. -- **Cancel/stop hook for in-flight regenerate streams (from T73 review)**: `post_turn` registers stream tasks in `_in_flight_tasks` so the user can stop them. Regenerate doesn't. A user clicking "Stop" mid-regenerate has no cancel hook today. -- **DRY: regenerate vs post_turn (from T73 review)**: recent-dialogue assembly and prior-edges block are duplicated between `chat/services/regenerate.py` and `chat/web/turns.py`. Extract to shared helpers analogous to `_gather_state_update_inputs`. -- **Sibling-discovery query optimization (from T73 review)**: `regenerate.py`'s sibling-assistant-turn lookup scans all non-superseded `assistant_turn` rows globally. Adding a `chat_id` predicate via JSON extraction (or a denormalized column) bounds the cost to per-chat scale. -- **`_witness_role_for` defensive coding (from T71 review)**: helper returns `"guest"` when `host_bot_id is None`, which is wrong for Phase-1 chats. Defensive: `return "host" if host_bot_id is None or speaker_bot_id == host_bot_id else "guest"`. Not exercised by current tests; harden as a precaution. -- **Confidence type tightening (from T74 review)**: `chat/services/addressee.py::AddresseeDecision.confidence` could be typed as `Literal["high","medium","low"]` for stricter validation. Currently `str` with a comment. -- **Scene-close-on-cancel UX revisit**: T74.3 pinned the existing behavior (close fires even on cancel). If real play-testing surfaces a regression, revisit. +All items shipped — see Phase 3.5 status below. ## Phase 3 status @@ -249,51 +241,49 @@ Phase 3 shipped end-to-end across 19 tasks (T49–T67). Events with full lifecyc ### Phase 3.5 / 4 backlog -New follow-ups discovered during Phase 3 reviews and execution. None are blocking; pick up at any time. +All items shipped — see Phase 3.5 status below. -#### From T53 review +## Phase 3.5 status -- **`narrate_skip` `timeout_s` not piped through to `client.generate`**: parameter accepted but ignored. Fix: pass `timeout_s=timeout_s` to `client.generate(**...)`, or drop the parameter entirely if Featherless's client doesn't honor it. +Phase 3.5 cleanup shipped end-to-end across 12 tasks (T76–T87). Two CLAUDE.md backlogs (Phase 2.6/3, Phase 3.5/4) are now empty; deferred follow-ups discovered during execution are tracked in a new "Phase 3.6 / 4 backlog" section below. Test count grew from 315 (Phase 3) to 343 (+28 new tests). -#### From T57 review +- **Wave 1 — trivial polish (parallel)**: + - **T76** `narrate_skip` `timeout_s` plumbed through to `client.generate`. + - **T77** `AddresseeDecision.confidence` typed as `Literal["high","medium","low"]`. + - **T78** `search_memories` docstring notes SQL-side significance bias (`SIGNIFICANCE_RANK_BIAS`). + - **T79** `_witness_role_for` defensive `host_bot_id is None` handling (returns `"host"` for Phase-1 chats). +- **Wave 2 — scene_summarize polish (single)**: + - **T80** five T58 follow-ups: re-close suffix bloat guard, transcript scoping by scene, swallowed-exception logging in `detect_threads`, chat-clock `closed_at`, and three new tests covering T58 gaps (200-char truncation, `thread_updated`/`thread_closed` candidate paths, try/except fallback). +- **Wave 3 — typed exception (single)**: + - **T81** `ChatNotFoundError` replaces string-prefix sniff in skip routes; mapped to 404 (vs 400 for other `ValueError` cases). +- **Wave 4 — turn-flow wiring (single)**: + - **T82** `consume_pending_meanwhile_digests` wired into `post_turn` (closes T66 gap; meanwhile digests no longer pile up); natural-language skip dispatch now runs scene close detection first. +- **Wave 5 — regenerate polish (single)**: + - **T83** five sub-fixes — cancel/stop hook (regenerate registers stream task in `_in_flight_tasks`); DRY extraction of `read_recent_dialogue` and `gather_prior_edges` into `chat/services/turn_common.py`; chat-scoped sibling-assistant-turn lookup; lifecycle-rollback warning log on regenerate; ordering-symmetry comment between post_turn and regenerate event-detection paths. +- **Wave 6 — final polish (parallel)**: + - **T84** unified `record_turn_memory` API with `you_present` kwarg; `record_meanwhile_memory` becomes a thin wrapper. + - **T85** JSON-build audit (no findings) + meanwhile cancel route-level test. + - **T86** frontend `turn_html_replace` SSE handler + turn_id stamping on rendered HTML so the in-place swap actually works. -- **`search_memories` docstring should mention SQL-side significance bias**: the function docstring still describes only the Python composite re-rank; add a one-line note about `SIGNIFICANCE_RANK_BIAS`. +### Phase 3.6 / 4 backlog -#### From T58 review +New follow-ups discovered during Phase 3.5 reviews and execution. None are blocking; pick up at any time. -- **Scene close re-close suffix bloat risk**: `_build_key_quotes_suffix` reads from `memories.pov_summary`. If a scene close runs twice, the second pass would read the rewritten text plus the previous "Key quotes:" suffix and append a second one. Either guard for double-suffix or source quotes from `event_log` `assistant_turn`/`user_turn` text instead. -- **Thread detection transcript scoping**: `_read_recent_dialogue` returns chat-wide history with no `scene_id` filter (Phase 1 turns lack one). Feeding chat-wide history to `detect_threads` will misattribute threads to the closing scene when the scene boundary falls inside the last 50 turns. Scope by `scene_id` once turns carry it, or by `started_at` against scene-open timestamp. -- **Swallowed exceptions in `detect_threads` try/except**: bare `Exception` swallows programmer errors silently. Log at debug level so silent regressions are recoverable. -- **Scene close `closed_at` clock divergence**: T58 uses `datetime.now(timezone.utc).isoformat()` instead of chat-clock time. Diverges from chat-clock semantics elsewhere; revisit if event reconstructions need chat-clock ordering. -- **Test coverage gaps in T58**: no test for 200-char quote truncation; no test for `thread_updated`/`thread_closed` candidate paths; no test for the `try/except` fallback. +#### From T80 review -#### From T61 review +- **`read_recent_dialogue` chat-id pushdown**: helper filters `chat_id` post-fetch in Python. Could push the `json_extract(payload_json, '$.chat_id') = ?` predicate into SQL (matching T83.3's pattern) for tighter LIMIT semantics. Currently a chat-with-many-other-chats can have its 50-row LIMIT consumed by foreign rows. +- **Lifecycle warning wording in regenerate**: T83.4's warning log lists ALL lifecycle event ids that exist after the original `assistant_turn` id, not just ones produced by the superseded turn. For the typical "regenerate the most recent" flow these are identical, but if a user regenerates an OLDER turn, the warning will list intervening-turn lifecycle events that legitimately stand. Tighten warning wording to "lifecycle transitions at-or-after turn X" (operator-friendly); a code-level fix would require a schema change to add explicit back-reference from lifecycle events to their producing turn. -- **Regenerate doesn't roll back lifecycle transitions from superseded turn**: `event_started`/`event_completed` rows from a superseded turn remain. Phase 3.5 should add a lifecycle-undo step. Caveat: regenerate-after-completion may double-emit promotion artifacts if the new text re-completes the same event. -- **Asymmetry in event-detection ordering**: post_turn runs lifecycle BETWEEN interjection and scene-close; regenerate runs lifecycle at the END. Benign because regenerate has no scene-close path, but worth tidying. +#### From T84 review -#### From T62 review +- **`record_turn_memory` legacy single-bot function** still exists alongside the unified `record_turn_memory_for_present`. Could be consolidated in a follow-up. -- **Error-message prefix sniff for 404 vs 400 routing**: drawer skip routes use `str(exc).startswith("chat not found")` to distinguish 404 from 400. Fragile if error wording changes. Use a typed exception subclass. -- **Skip command bypasses scene close detection**: a user typing "fade out, skip an hour" would skip without closing the scene. Acceptable for Phase 3 but worth noting. +#### From T86 fix-up -#### From T63 review +- **Test fixtures + `tests/test_phase3_integration.py`** that seed turns directly via `append_event`+`project` may need updating once any new test asserts the rendered HTML carries the new turn ids end-to-end. Existing tests pass because they don't read the stamped attribute, but they're brittle if the contract evolves. -- **`participants_json` JSON injection** (FIXED in T63 but worth noting in backlog as a "double-check other JSON-string-build sites" task): T63 originally used f-string interpolation; fixed to use `json.dumps`. Audit other state modules for similar patterns. +#### Deferred items (carry-over) -#### From T64 review - -- **`record_meanwhile_memory` and `record_turn_memory_for_present` share private `_write_one_memory` helper**: minor DRY note; both helpers are similar enough that a unified API with a `you_present: bool` kwarg might be cleaner long-term. -- **Stop button cancellation for meanwhile turns**: T64 fix-up registered tasks in `_in_flight_tasks`; verify the `/turns/cancel` endpoint actually cancels meanwhile streams (the test pins registration but not the cancel-from-route path). - -#### From cross-feature interactions discovered in Wave 6b merge - -- **Cross-feature canned-queue brittleness**: meanwhile-scene close test required a canned response for T65's digest call after T64+T65 merge. Future close-path additions will keep extending the queue; consider a structured fixture builder rather than positional canned arrays. - -#### From T66 integration tests - -- **`consume_pending_meanwhile_digests` is defined but NOT wired into `post_turn`**: the helper lives in `chat/services/prompt.py` (T65) but `chat/web/turns.py` never calls it. Meanwhile digests stay pending forever in production. Phase 3.5 should call the helper after the first you-turn following a meanwhile close — probably right after the assistant_turn lands but before the next prompt assembly. Pinned by `tests/test_phase3_integration.py::test_meanwhile_close_digest_surfaces_then_consumed` which currently calls the helper directly. - -#### Discovered during Phase 3 execution - -- **`_witness_role_for` defensive `host_bot_id is None`** (carry-over from Phase 2.5 T71 backlog) — still pending. +- **Scene-close-on-cancel UX revisit** (Phase 2.5 carry-over): T74.3 pinned the existing behavior; revisit if real play-testing surfaces a regression. +- **Cross-feature canned-queue brittleness**: meanwhile-scene close test required a canned response for T65's digest call after T64+T65 merge. Future close-path additions will keep extending the queue. Consider a structured fixture builder rather than positional canned arrays. NOT addressed in Phase 3.5. +- **Lifecycle-transition rollback in regenerate**: T83.4 added a warning log; actual rollback (with proper schema linkage from lifecycle event back to producing turn) is Phase 4 work. diff --git a/chat/services/addressee.py b/chat/services/addressee.py index e085d79..1cf1199 100644 --- a/chat/services/addressee.py +++ b/chat/services/addressee.py @@ -22,6 +22,8 @@ from a fallback. from __future__ import annotations +from typing import Literal + from pydantic import BaseModel from chat.llm.classify import classify @@ -39,7 +41,7 @@ class AddresseeDecision(BaseModel): """ addressee_id: str - confidence: str = "medium" # "high" | "medium" | "low" + confidence: Literal["high", "medium", "low"] = "medium" reason: str = "" diff --git a/chat/services/memory_write.py b/chat/services/memory_write.py index 6fc6ecf..12eed5d 100644 --- a/chat/services/memory_write.py +++ b/chat/services/memory_write.py @@ -134,17 +134,34 @@ def record_turn_memory_for_present( chat_clock_at: str | None = None, source: str = "direct", significance: int = 1, + you_present: bool = True, ) -> dict[str, tuple[int, int | None]]: - """Write a ``memory_written`` event for each present bot witness. + """Single entry-point for per-turn memory writes (T84). - Host is always written. Guest is written iff ``guest_bot_id is not - None``. Witness flags are ``[you=1, host=1, guest=1]`` when a guest - is present, ``[you=1, host=1, guest=0]`` otherwise. + Writes one ``memory_written`` event per present bot witness. Host is + always written. Guest is written iff ``guest_bot_id is not None``. + + Witness flags depend on ``you_present``: + + - ``you_present=True`` (default — Phase 1/2/3 you-scenes): the user + is a witness. Mask is ``[you=1, host=1, guest=1]`` when a guest is + present, ``[you=1, host=1, guest=0]`` otherwise. + - ``you_present=False`` (Phase 3 meanwhile scenes): the user is + absent. Mask is ``[you=0, host=1, guest=1]`` for both bots. Both + ``host_bot_id`` and ``guest_bot_id`` are required — a meanwhile + scene by definition has both bots, so passing ``guest_bot_id=None`` + with ``you_present=False`` is a programming error and raises + :class:`ValueError`. Returns a mapping ``{bot_id: (event_id, memory_id)}`` so callers can look up the freshly-projected memory id per owner without re-querying the database. """ + if not you_present and guest_bot_id is None: + raise ValueError("you_present=False requires guest_bot_id") + + witness_you = 1 if you_present else 0 + witness_host = 1 witness_guest = 1 if guest_bot_id is not None else 0 result: dict[str, tuple[int, int | None]] = {} @@ -153,8 +170,8 @@ def record_turn_memory_for_present( owner_id=host_bot_id, chat_id=chat_id, narrative_text=narrative_text, - witness_you=1, - witness_host=1, + witness_you=witness_you, + witness_host=witness_host, witness_guest=witness_guest, scene_id=scene_id, chat_clock_at=chat_clock_at, @@ -167,8 +184,8 @@ def record_turn_memory_for_present( owner_id=guest_bot_id, chat_id=chat_id, narrative_text=narrative_text, - witness_you=1, - witness_host=1, + witness_you=witness_you, + witness_host=witness_host, witness_guest=1, scene_id=scene_id, chat_clock_at=chat_clock_at, @@ -190,46 +207,22 @@ def record_meanwhile_memory( source: str = "direct", significance: int = 1, ) -> dict[str, tuple[int, int | None]]: - """Write per-POV ``memory_written`` events for a meanwhile turn (T64). + """Backward-compat thin wrapper for meanwhile memory writes (T64, T84). - A meanwhile scene runs entirely between host + guest, with "you" - absent. Both bots are present witnesses, so each one gets a row with - witness flags ``[you=0, host=1, guest=1]`` — different from the - normal-turn ``record_turn_memory_for_present`` shape, which assumes - the user is always a witness (``witness_you=1``). - - The ``guest_bot_id`` is required (a meanwhile scene by definition - has both bots) — callers passing ``None`` is a programming error. - - Returns ``{bot_id: (event_id, memory_id)}`` mirroring - :func:`record_turn_memory_for_present` so downstream queues - (significance scoring) can pull memory ids without re-querying. + Equivalent to calling :func:`record_turn_memory_for_present` with + ``you_present=False``. Kept so existing call sites in + :mod:`chat.web.meanwhile` continue to work without churn. New code + should prefer the unified entry-point directly. """ - result: dict[str, tuple[int, int | None]] = {} - result[host_bot_id] = _write_one_memory( + return record_turn_memory_for_present( conn, - owner_id=host_bot_id, chat_id=chat_id, + host_bot_id=host_bot_id, + guest_bot_id=guest_bot_id, narrative_text=narrative_text, - witness_you=0, - witness_host=1, - witness_guest=1, scene_id=scene_id, chat_clock_at=chat_clock_at, source=source, significance=significance, + you_present=False, ) - result[guest_bot_id] = _write_one_memory( - conn, - owner_id=guest_bot_id, - chat_id=chat_id, - narrative_text=narrative_text, - witness_you=0, - witness_host=1, - witness_guest=1, - scene_id=scene_id, - chat_clock_at=chat_clock_at, - source=source, - significance=significance, - ) - return result diff --git a/chat/services/prompt.py b/chat/services/prompt.py index 0ba2bbd..d27d23c 100644 --- a/chat/services/prompt.py +++ b/chat/services/prompt.py @@ -379,8 +379,15 @@ def _witness_role_for(speaker_bot_id: str, host_bot_id: str | None) -> str: pinned the contract on ``search_memories``; this helper applies it at the call site so a guest-as-speaker doesn't silently retrieve memories under the wrong POV mask. + + When ``host_bot_id`` is ``None`` (degenerate case from a half-seeded + chat or Phase-1 path), the speaker is treated as the host so the + query falls back to the host POV mask rather than silently masking + the speaker's own memories as a guest. """ - return "host" if speaker_bot_id == host_bot_id else "guest" + if host_bot_id is None or speaker_bot_id == host_bot_id: + return "host" + return "guest" def _resolve_addressee( diff --git a/chat/services/regenerate.py b/chat/services/regenerate.py index 1317903..b2aba9a 100644 --- a/chat/services/regenerate.py +++ b/chat/services/regenerate.py @@ -68,7 +68,9 @@ Phase 2.5 changes: from __future__ import annotations +import asyncio import json +import logging from sqlite3 import Connection from chat.config import Settings @@ -79,6 +81,10 @@ from chat.services.interjection import detect_interjection from chat.services.memory_write import record_turn_memory_for_present from chat.services.multi_state_update import compute_state_updates_for_present from chat.services.prompt import assemble_narrative_prompt +from chat.services.turn_common import ( + gather_prior_edges, + read_recent_dialogue, +) from chat.state.edges import get_edge from chat.state.entities import get_bot, get_you from chat.state.events import list_active_events @@ -86,6 +92,8 @@ from chat.state.world import active_scene, get_chat from chat.web.pubsub import publish from chat.web.render import render_turn_html +_log = logging.getLogger(__name__) + async def regenerate_assistant_turn( conn: Connection, @@ -104,6 +112,19 @@ async def regenerate_assistant_turn( Raises :class:`ValueError` when the chat or the assistant_turn event cannot be found — the FastAPI route translates this to 404. + + .. note:: + **Lifecycle-rollback limitation (T83.4, Phase 4 follow-up).** + When the superseded turn already produced lifecycle transitions + (``event_started`` / ``event_completed`` / ``event_cancelled``), + this function does NOT roll those rows back before re-running + ``detect_event_transitions`` against the regenerated text. A + regenerate-after-completion can therefore double-emit promotion + artifacts if the new text re-completes the same event. Phase 3.5 + only documents the gap and emits a WARNING log naming the + affected event_log ids; the actual undo pass is invasive + (re-projection / inverse-handler dispatch) and is deferred to + Phase 4. See the ``# T83.4`` block below for the warning emit. """ chat = get_chat(conn, chat_id) if chat is None: @@ -136,6 +157,40 @@ async def regenerate_assistant_turn( original_assistant_payload = json.loads(row[0]) original_user_turn_id = original_assistant_payload.get("user_turn_id") + # T83.4: scan for downstream lifecycle transitions emitted by the + # superseded turn — they're not being rolled back (see method + # docstring). Heuristic: any ``event_started`` / ``event_completed`` + # / ``event_cancelled`` event_log row with id strictly greater than + # the original assistant_turn's id was emitted as part of (or after) + # that turn's processing. Lifecycle events don't carry ``chat_id`` + # in their payload (their payload references an ``event_id`` FK to + # the ``events`` table, which holds chat_id), so we join through + # ``events`` to scope to this chat. + # + # A WARNING log surfaces the affected event ids so operators can + # spot double-emit cases until the Phase 4 rollback pass lands. + unrolled_lifecycle = conn.execute( + "SELECT el.id, el.kind FROM event_log AS el " + "JOIN events AS ev " + " ON ev.event_id = json_extract(el.payload_json, '$.event_id') " + "WHERE el.kind IN (" + " 'event_started', 'event_completed', 'event_cancelled'" + " ) " + " AND ev.chat_id = ? " + " AND el.id > ? " + "ORDER BY el.id ASC", + (chat_id, original_assistant_event_id), + ).fetchall() + if unrolled_lifecycle: + _log.warning( + "regenerate_assistant_turn: %d lifecycle transition(s) from " + "superseded turn %s are NOT being rolled back (Phase 4 " + "follow-up). Affected event ids: %s", + len(unrolled_lifecycle), + original_assistant_event_id, + [r[0] for r in unrolled_lifecycle], + ) + # 1a. Look up any sibling interjection beat in the same turn group # (T73.2). The original group is (primary + optional interjection), # both pinned to the same ``user_turn_id``. The interjection has a @@ -143,6 +198,13 @@ async def regenerate_assistant_turn( # the silent witness (the bot that wasn't the primary addressee). # Filter on ``superseded_by IS NULL`` so prior regenerates of this # group don't reappear as siblings. + # + # T83.3: push the chat_id filter into SQL via ``json_extract`` so + # the query doesn't scan every assistant_turn row across the whole + # database. ``LIMIT 50`` bounds worst-case work even when chat_id + # isn't selective (e.g. a single chat with many turns) — we only + # need the one matching sibling. Mirrors the SQL pattern in + # ``chat.web.meanwhile._last_meanwhile_speaker``. original_interjection_event_id: int | None = None original_interjection_payload: dict | None = None if original_user_turn_id is not None: @@ -150,8 +212,11 @@ async def regenerate_assistant_turn( "SELECT id, payload_json FROM event_log " "WHERE kind = 'assistant_turn' " " AND id != ? " - " AND superseded_by IS NULL", - (original_assistant_event_id,), + " AND superseded_by IS NULL " + " AND json_extract(payload_json, '$.chat_id') = ? " + "ORDER BY id DESC " + "LIMIT 50", + (original_assistant_event_id, chat_id), ) for sib_id, sib_payload_json in sibling_cur.fetchall(): sib_payload = json.loads(sib_payload_json) @@ -208,33 +273,30 @@ async def regenerate_assistant_turn( # assistant_turn explicitly (we haven't superseded it yet — that # update lands at the end so the new event_id is known) and use the # standard ``superseded_by IS NULL AND hidden = 0`` filter so any - # prior regenerates also drop out. + # prior regenerates also drop out. T83.2: shared helper handles the + # SQL + filtering; we post-process to map speaker ids to display + # names for the prompt. you_entity = get_you(conn) or {"name": "you", "persona": ""} you_name = you_entity.get("name", "you") - cur = conn.execute( - "SELECT id, kind, payload_json FROM event_log " - "WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') " - " AND id != ? " - " AND superseded_by IS NULL AND hidden = 0 " - "ORDER BY id DESC LIMIT 20", - (original_assistant_event_id,), + raw_recent = read_recent_dialogue( + conn, + chat_id, + limit=20, + exclude_event_id=original_assistant_event_id, ) - rows = list(reversed(cur.fetchall())) recent: list[dict] = [] - for _eid, kind, payload_json in rows: - p = json.loads(payload_json) - if p.get("chat_id") != chat_id: + for entry in raw_recent: + spk = entry.get("speaker", "bot") + if spk == "you": + recent.append({"speaker": you_name, "text": entry.get("text", "")}) continue - if kind in ("user_turn", "user_turn_edit"): - recent.append({"speaker": you_name, "text": p.get("prose", "")}) - else: - spk = p.get("speaker_id", "bot") + if spk == host_bot_id: spk_name = host_bot.get("name", "bot") - if spk == host_bot_id: - spk_name = host_bot.get("name", "bot") - elif guest_bot is not None and spk == guest_bot.get("id"): - spk_name = guest_bot.get("name", "bot") - recent.append({"speaker": spk_name, "text": p.get("text", "")}) + elif guest_bot is not None and spk == guest_bot.get("id"): + spk_name = guest_bot.get("name", "bot") + else: + spk_name = host_bot.get("name", "bot") + recent.append({"speaker": spk_name, "text": entry.get("text", "")}) # 4. Assemble the narrative prompt. ``recent`` already excludes the # current user prose, which we pass through ``user_turn_prose``. @@ -250,19 +312,37 @@ async def regenerate_assistant_turn( guest_id=guest_bot_id, ) - # 5. Stream the new narrative. + # 5. Stream the new narrative. T83.1: register the streaming Task in + # the chat-keyed in-flight registry so POST /chats//turns/cancel + # can call ``.cancel()`` on a mid-regenerate stream. We import the + # underscore name from turns.py deliberately — same single-process + # registry the cancel route reads, mirrors the meanwhile registration + # pattern in chat/web/meanwhile.py. + from chat.web.turns import _in_flight_tasks # noqa: PLC0415 + accumulated: list[str] = [] - async for chunk in client.stream( - messages, - model=settings.narrative_model, - max_tokens=settings.narrative_max_tokens, - temperature=settings.narrative_temperature, - ): - accumulated.append(chunk) - await publish( - chat_id, - {"event": "token", "text": chunk, "speaker_id": speaker_bot_id}, - ) + + async def _stream_primary() -> None: + async for chunk in client.stream( + messages, + model=settings.narrative_model, + max_tokens=settings.narrative_max_tokens, + temperature=settings.narrative_temperature, + ): + accumulated.append(chunk) + await publish( + chat_id, + {"event": "token", "text": chunk, "speaker_id": speaker_bot_id}, + ) + + stream_task = asyncio.create_task(_stream_primary()) + _in_flight_tasks[chat_id] = stream_task + try: + await stream_task + finally: + # Always unregister so a subsequent turn / regenerate can register + # a fresh task. Mirrors the cleanup in turns.py::post_turn. + _in_flight_tasks.pop(chat_id, None) new_text = "".join(accumulated) # 6. Append the new assistant_turn event. ``user_turn_id`` points at @@ -301,7 +381,10 @@ async def regenerate_assistant_turn( speaker_bot.get("name", "bot") if speaker_bot is not None else "bot" ) new_turn_html = render_turn_html( - speaker_name_for_render, new_text, role="bot" + speaker_name_for_render, + new_text, + role="bot", + event_id=new_assistant_event_id, ) await publish( chat_id, @@ -354,17 +437,8 @@ async def regenerate_assistant_turn( present_names[guest_bot_id] = guest_bot.get("name", "bot") personas[guest_bot_id] = guest_bot.get("persona") or "" - prior_edges: dict[tuple[str, str], dict] = {} - for src in present_ids: - for tgt in present_ids: - if src == tgt: - continue - edge = get_edge(conn, src, tgt) or { - "affinity": 50, - "trust": 50, - "summary": "", - } - prior_edges[(src, tgt)] = edge + # T83.2: shared helper builds the directed-pair edge dict. + prior_edges = gather_prior_edges(conn, present_ids) state_updates = await compute_state_updates_for_present( client, @@ -453,34 +527,27 @@ async def regenerate_assistant_turn( ) if decision.should_interject: - # Re-read recent so the just-appended primary is in the prompt. - interject_cur = conn.execute( - "SELECT id, kind, payload_json FROM event_log " - "WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') " - " AND superseded_by IS NULL AND hidden = 0 " - "ORDER BY id DESC LIMIT 20", - ) - interject_rows = list(reversed(interject_cur.fetchall())) + # Re-read recent so the just-appended primary is in the + # prompt. T83.2: shared helper + the same id->name mapping + # as the primary read above. + raw_interject = read_recent_dialogue(conn, chat_id, limit=20) interject_recent: list[dict] = [] - for _eid, kind, payload_json in interject_rows: - p = json.loads(payload_json) - if p.get("chat_id") != chat_id: + for entry in raw_interject: + spk = entry.get("speaker", "bot") + if spk == "you": + interject_recent.append( + {"speaker": you_name, "text": entry.get("text", "")} + ) continue - if kind in ("user_turn", "user_turn_edit"): - interject_recent.append( - {"speaker": you_name, "text": p.get("prose", "")} - ) + if spk == host_bot_id: + spk_name = host_bot.get("name", "bot") + elif spk == guest_bot.get("id"): + spk_name = guest_bot.get("name", "bot") else: - spk = p.get("speaker_id", "bot") - if spk == host_bot_id: - spk_name = host_bot.get("name", "bot") - elif spk == guest_bot.get("id"): - spk_name = guest_bot.get("name", "bot") - else: - spk_name = "bot" - interject_recent.append( - {"speaker": spk_name, "text": p.get("text", "")} - ) + spk_name = "bot" + interject_recent.append( + {"speaker": spk_name, "text": entry.get("text", "")} + ) if interject_recent and interject_recent[-1].get("speaker") == you_name: interject_recent = interject_recent[:-1] @@ -497,21 +564,32 @@ async def regenerate_assistant_turn( ) interject_accumulated: list[str] = [] - async for chunk in client.stream( - interject_messages, - model=settings.narrative_model, - max_tokens=settings.narrative_max_tokens, - temperature=settings.narrative_temperature, - ): - interject_accumulated.append(chunk) - await publish( - chat_id, - { - "event": "token", - "text": chunk, - "speaker_id": silent_witness_id, - }, - ) + + async def _stream_interjection() -> None: + async for chunk in client.stream( + interject_messages, + model=settings.narrative_model, + max_tokens=settings.narrative_max_tokens, + temperature=settings.narrative_temperature, + ): + interject_accumulated.append(chunk) + await publish( + chat_id, + { + "event": "token", + "text": chunk, + "speaker_id": silent_witness_id, + }, + ) + + # T83.1: register the interjection sub-stream in the same + # in-flight registry so /turns/cancel collapses it too. + interject_task = asyncio.create_task(_stream_interjection()) + _in_flight_tasks[chat_id] = interject_task + try: + await interject_task + finally: + _in_flight_tasks.pop(chat_id, None) interject_text = "".join(interject_accumulated) new_interjection_event_id = append_event( @@ -541,7 +619,10 @@ async def regenerate_assistant_turn( # Broadcast a replace event so connected tabs swap the prior # interjection node in-place (mirrors T73.1's primary swap). interject_html = render_turn_html( - silent_witness.get("name", "bot"), interject_text, role="bot" + silent_witness.get("name", "bot"), + interject_text, + role="bot", + event_id=new_interjection_event_id, ) await publish( chat_id, @@ -573,17 +654,8 @@ async def regenerate_assistant_turn( "text": interject_text, } ] - prior_edges_post: dict[tuple[str, str], dict] = {} - for src in present_ids: - for tgt in present_ids: - if src == tgt: - continue - edge = get_edge(conn, src, tgt) or { - "affinity": 50, - "trust": 50, - "summary": "", - } - prior_edges_post[(src, tgt)] = edge + # T83.2: shared helper handles the directed-pair edge dict. + prior_edges_post = gather_prior_edges(conn, present_ids) state_updates_post = await compute_state_updates_for_present( client, @@ -620,23 +692,28 @@ async def regenerate_assistant_turn( (new_assistant_event_id, original_interjection_event_id), ) - # 10. Event-lifecycle detection (Phase 3, T61). Mirrors the post_turn - # block: classify whether any active events transitioned in the - # regenerated narrative and append the corresponding event_started / + # 9a. Event-lifecycle detection (Phase 3, T61). T83.5 cosmetic + # ordering: mirrors ``chat.web.turns.post_turn``'s 8a block — runs + # AFTER the interjection branch (and AFTER the post-interjection + # state-update + memory passes) so the classifier sees the same + # narrative-text input post_turn does. Numbering uses ``9a`` to + # match post_turn's ``8a`` shape (the interjection branch is step 9 + # in regenerate vs step 8 in post_turn; lifecycle is the immediate + # follow-on in both). Behaviour identical to the prior ``step 10`` + # placement — the block was already structurally last in regenerate + # because there's no scene-close pass here. + # + # Classify whether any active events transitioned in the regenerated + # narrative and append the corresponding event_started / # event_completed / event_cancelled. ``promote_completed_event`` # runs inline after a completion so promotion artifacts land in the # same regenerate path. # - # Phase 3.5 follow-up: when a regenerate replaces a turn that had - # already produced event transitions, those original transitions are - # NOT undone here. The superseded ``assistant_turn`` group keeps its - # prior ``event_started`` / ``event_completed`` events in the log - # (they remain projected onto the events table). Phase 3.5 will add - # an "undo lifecycle" step to roll back the prior transitions before - # re-classifying the regenerated text. For v3 we accept that a - # regenerate-after-completion will double-emit promotion artifacts - # if the new text re-completes the same event — narratively rare, - # and a true fix needs the lifecycle-undo pass. + # T83.4 follow-up: when a regenerate replaces a turn that had + # already produced event transitions, those original transitions + # are NOT undone here (Phase 4 work). A WARNING log earlier in this + # function names the affected event_log ids — see the T83.4 block + # near the function entry. new_active_events = list_active_events(conn, chat_id) if new_active_events: lifecycle_decision = await detect_event_transitions( diff --git a/chat/services/scene_summarize.py b/chat/services/scene_summarize.py index 386cf51..7551f8b 100644 --- a/chat/services/scene_summarize.py +++ b/chat/services/scene_summarize.py @@ -29,6 +29,7 @@ keeps moving. from __future__ import annotations import json +import logging import uuid from datetime import datetime, timezone from sqlite3 import Connection @@ -39,6 +40,8 @@ from chat.eventlog.log import append_and_apply from chat.llm.classify import classify from chat.llm.client import LLMClient +_log = logging.getLogger(__name__) + class ScenePOVSummary(BaseModel): """Classifier output: one witness's view of a closing scene. @@ -123,7 +126,11 @@ async def summarize_scene( def _read_recent_dialogue( - conn: Connection, chat_id: str, *, limit: int = 50 + conn: Connection, + chat_id: str, + *, + limit: int = 50, + since_event_id: int | None = None, ) -> list[dict]: """Pull the last ``limit`` user/assistant turns for ``chat_id``. @@ -132,14 +139,29 @@ def _read_recent_dialogue( the most recent turns of the chat. Superseded and hidden rows are filtered out so regenerated turns (T29) don't bleed into the summary. + + T80.2: ``since_event_id`` clamps the result to event_log rows whose + ``id >= since_event_id`` so callers needing a scene-scoped view (e.g. + thread detection on close) don't pull turns that landed before the + closing scene's ``scene_opened`` event. """ - cur = conn.execute( - "SELECT kind, payload_json FROM event_log " - "WHERE kind IN ('user_turn', 'assistant_turn') " - " AND superseded_by IS NULL AND hidden = 0 " - "ORDER BY id DESC LIMIT ?", - (limit,), - ) + if since_event_id is None: + cur = conn.execute( + "SELECT kind, payload_json FROM event_log " + "WHERE kind IN ('user_turn', 'assistant_turn') " + " AND superseded_by IS NULL AND hidden = 0 " + "ORDER BY id DESC LIMIT ?", + (limit,), + ) + else: + cur = conn.execute( + "SELECT kind, payload_json FROM event_log " + "WHERE kind IN ('user_turn', 'assistant_turn') " + " AND superseded_by IS NULL AND hidden = 0 " + " AND id >= ? " + "ORDER BY id DESC LIMIT ?", + (since_event_id, limit), + ) rows = list(reversed(cur.fetchall())) out: list[dict] = [] for kind, payload_json in rows: @@ -158,6 +180,65 @@ def _read_recent_dialogue( return out +def _scene_opened_event_id( + conn: Connection, chat_id: str, scene_id: int +) -> int | None: + """Return the event_log id of the ``scene_opened`` (or + ``meanwhile_scene_started``) event that created scene row + ``scene_id``. Used by T80.2 to lower-bound dialogue reads to a + single scene's transcript. + + ``meanwhile_scene_started`` carries an explicit ``scene_id`` so we + match on that directly. ``scene_opened`` doesn't, so we walk the + chat's scene rows in id order and zip against the chat's scene-open + events in id order — the projector creates one scene row per + scene-open event, so positions correspond. + + Returns ``None`` when no matching event is found; callers should + treat that as "fall back to chat-wide" rather than over-filter. + """ + # Fast path for meanwhile children (explicit scene_id in payload). + for ev_id, payload_json in conn.execute( + "SELECT id, payload_json FROM event_log " + "WHERE kind = 'meanwhile_scene_started' " + " AND superseded_by IS NULL AND hidden = 0", + ).fetchall(): + try: + p = json.loads(payload_json) + except (TypeError, ValueError): + continue + if p.get("chat_id") == chat_id and p.get("scene_id") == scene_id: + return ev_id + # Fallback for parent you-scenes: zip chat-scoped scene-open events + # against chat-scoped scene rows in id order. + chat_scene_ids = [ + r[0] + for r in conn.execute( + "SELECT id FROM scenes WHERE chat_id = ? ORDER BY id ASC", + (chat_id,), + ).fetchall() + ] + if scene_id not in chat_scene_ids: + return None + chat_open_evs: list[int] = [] + for ev_id, _kind, payload_json in conn.execute( + "SELECT id, kind, payload_json FROM event_log " + "WHERE kind IN ('scene_opened', 'meanwhile_scene_started') " + " AND superseded_by IS NULL AND hidden = 0 " + "ORDER BY id ASC", + ).fetchall(): + try: + p = json.loads(payload_json) + except (TypeError, ValueError): + continue + if p.get("chat_id") == chat_id: + chat_open_evs.append(ev_id) + idx = chat_scene_ids.index(scene_id) + if idx < len(chat_open_evs): + return chat_open_evs[idx] + return None + + async def _summarize_and_apply_for_witness( conn: Connection, client: LLMClient, @@ -213,7 +294,11 @@ async def _summarize_and_apply_for_witness( # Empty default -> skip the memory rewrite; the seeded # per-turn pov_summary stays in place. continue - new_value = pov.summary + key_quotes_suffix + # T80.1: a prior close may have already appended a Key quotes + # suffix to this row's pov_summary. Strip it here so the fresh + # rewrite replaces the existing suffix rather than stacking a + # second one on top. + new_value = _strip_key_quotes_suffix(pov.summary) + key_quotes_suffix append_and_apply( conn, kind="manual_edit", @@ -263,6 +348,31 @@ async def _summarize_and_apply_for_witness( return pov +# T80.1: header marker shared by the suffix builder and the +# witness-write strip step. Any text starting with this marker is treated +# as a previously-appended Key quotes suffix and stripped before reuse so +# repeated scene closes don't compose recursive bloat. +_KEY_QUOTES_HEADER = "\n\nKey quotes:\n" + + +def _strip_key_quotes_suffix(text: str) -> str: + """Remove a previously-appended Key quotes suffix from ``text``. + + Returns ``text`` unchanged when the marker is absent, or the prefix + up to (but not including) the marker when present. Used in two + places: (1) when sourcing quote text from a memory row that may + already carry the suffix from a prior close, and (2) when computing + the per-POV rewrite's prior_value so the new write replaces — rather + than stacks on — the old suffix. + """ + if not text: + return text + idx = text.find(_KEY_QUOTES_HEADER) + if idx >= 0: + return text[:idx] + return text + + def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str: """If the scene's max-turn-significance is >= 2, build the "Key quotes:" suffix from the top-3 highest-significance memory rows @@ -274,6 +384,10 @@ def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str: per-turn narrative seeded by T21, since this helper is called BEFORE the per-POV rewrite. Texts are truncated to 200 chars to bound memory row growth across many witnesses. + + T80.1: candidate text is run through :func:`_strip_key_quotes_suffix` + first so a re-close (whose source memories already carry a suffix from + the prior close) doesn't quote a quote. """ row = conn.execute( "SELECT MAX(significance) FROM memories WHERE scene_id = ?", @@ -288,7 +402,7 @@ def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str: (scene_id,), ) quotes = [ - (r[0] or "")[:200] + _strip_key_quotes_suffix(r[0] or "")[:200] for r in cur.fetchall() ] if not quotes: @@ -454,20 +568,35 @@ async def apply_scene_close_summary( }, ) - # T58.2: thread detection on close. Reuses the dialogue we already - # gathered for per-POV summarization — same {speaker, text} shape - # detect_threads expects. Failure-tolerant: classify() returns the - # empty default on retry-exhaustion, and the broad except below - # protects the close pipeline from any other classifier/mock flap. + # T58.2: thread detection on close. Failure-tolerant: classify() + # returns the empty default on retry-exhaustion, and the broad except + # below protects the close pipeline from any other classifier/mock + # flap. + # + # T80.2: thread detection runs against a SCENE-SCOPED transcript, + # not the chat-wide last-50 turns used by the per-POV summaries. + # Mis-attributing threads when scene boundaries fall inside the last + # 50 turns would otherwise close threads opened in a prior scene. + scene_open_ev_id = _scene_opened_event_id(conn, chat_id, scene_id) + if scene_open_ev_id is not None: + scene_dialogue = _read_recent_dialogue( + conn, chat_id, since_event_id=scene_open_ev_id + ) + else: + scene_dialogue = dialogue try: thread_result = await detect_threads( client, classifier_model=classifier_model, - scene_transcript=dialogue, + scene_transcript=scene_dialogue, open_threads=list_open_threads(conn, chat_id), timeout_s=timeout_s, ) - except Exception: + except Exception as exc: + # T80.3: log the swallowed exception at DEBUG so a + # programmer-error flap (e.g. wrong kwarg name) surfaces in + # local logs without breaking the close pipeline. + _log.debug("detect_threads failed: %s", exc, exc_info=True) from chat.services.thread_detection import ThreadDetectionResult thread_result = ThreadDetectionResult() @@ -495,12 +624,20 @@ async def apply_scene_close_summary( }, ) elif cand.action == "close" and cand.existing_thread_id: + # T80.4: chat-clock time, not wall clock — the rest of the + # close pipeline (memories, edges, scene_closed payloads) + # uses chat["time"] so threads must agree. Falls back to + # UTC now only when the chat row has no clock yet (defensive + # — chat_state always seeds "time" via chat_created). + chat_clock_at = chat.get("time") or datetime.now( + timezone.utc + ).isoformat() append_and_apply( conn, kind="thread_closed", payload={ "thread_id": cand.existing_thread_id, - "closed_at": datetime.now(timezone.utc).isoformat(), + "closed_at": chat_clock_at, }, ) diff --git a/chat/services/skip_narration.py b/chat/services/skip_narration.py index 4590a7b..9fd191e 100644 --- a/chat/services/skip_narration.py +++ b/chat/services/skip_narration.py @@ -96,6 +96,7 @@ async def narrate_skip( model=narrative_model, max_tokens=200, temperature=0.7, + timeout_s=timeout_s, ) text = (result or "").strip() if not text: diff --git a/chat/services/turn_common.py b/chat/services/turn_common.py new file mode 100644 index 0000000..e246314 --- /dev/null +++ b/chat/services/turn_common.py @@ -0,0 +1,125 @@ +"""Shared helpers for turn flows (T83.2). + +Both ``chat.web.turns.post_turn`` and +``chat.services.regenerate.regenerate_assistant_turn`` need to: + +1. Pull a chronological tail of user-side and assistant_turn events for + prompt assembly + state-update inputs. +2. Build a directed-edge dict over a fixed set of "present" entity ids + for the multi-pair state-update pass (with the schema 50/50 default + filled in for missing rows). + +Before T83.2 each call site had its own copy of these blocks. The two +copies drifted on details (T73.1 added ``user_turn_edit`` handling to +turns.py; regenerate.py had a slightly different recent-window query). +This module is the single source so a future change to either lands in +both flows by construction. + +Note on overlap with ``chat.services.scene_summarize._read_recent_dialogue``: +that helper has a ``since_event_id`` clamp (T80.2 thread-detection +scope) and intentionally does NOT include ``user_turn_edit`` events — +its callers want the *original* prose, not edits. Deduplicating it +into here would either (a) require a new flag on the shared helper for +``user_turn_edit`` inclusion, or (b) silently change scene_summarize's +read shape. Both feel more invasive than the duplication is bad, so +that helper is left alone for now. +""" + +from __future__ import annotations + +import json +from sqlite3 import Connection + +from chat.state.edges import get_edge + + +def read_recent_dialogue( + conn: Connection, + chat_id: str, + *, + limit: int = 50, + exclude_event_id: int | None = None, +) -> list[dict]: + """Pull the last ``limit`` user-side / assistant_turn events for + ``chat_id`` as ``[{"speaker": , "text": }]``, + chronologically ordered (oldest first). + + Filters: ``superseded_by IS NULL AND hidden = 0`` — regenerated + rows drop out so the timeline reflects the current state. Includes + ``user_turn``, ``user_turn_edit`` (T29 edited prose substitutes for + the original — the original is marked superseded above), and + ``assistant_turn`` rows. + + ``exclude_event_id`` is an optional event_log id to skip — used by + regenerate to drop the original assistant_turn from its prompt + context window before that row has been marked superseded (the + supersede UPDATE lands at the end so the new event_id is known). + """ + if exclude_event_id is None: + cur = conn.execute( + "SELECT id, kind, payload_json FROM event_log " + "WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') " + " AND superseded_by IS NULL AND hidden = 0 " + "ORDER BY id DESC LIMIT ?", + (limit,), + ) + else: + cur = conn.execute( + "SELECT id, kind, payload_json FROM event_log " + "WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') " + " AND id != ? " + " AND superseded_by IS NULL AND hidden = 0 " + "ORDER BY id DESC LIMIT ?", + (exclude_event_id, limit), + ) + rows = list(reversed(cur.fetchall())) + out: list[dict] = [] + for row_id, kind, payload_json in rows: + p = json.loads(payload_json) + if p.get("chat_id") != chat_id: + continue + if kind in ("user_turn", "user_turn_edit"): + out.append( + { + "speaker": "you", + "text": p.get("prose", ""), + "event_id": row_id, + } + ) + else: + out.append( + { + "speaker": p.get("speaker_id", "bot"), + "text": p.get("text", ""), + "event_id": row_id, + } + ) + return out + + +def gather_prior_edges( + conn: Connection, present_ids: list[str] +) -> dict[tuple[str, str], dict]: + """Build ``{(src, tgt): {affinity, trust, summary}}`` for every + directed pair where both ``src`` and ``tgt`` are in ``present_ids`` + and ``src != tgt``. + + Missing rows fall back to the schema default 50/50 baseline (mirrors + the Phase 1 single-pair flow). Used by post_turn and regenerate to + seed the multi-pair state-update classifier. + """ + prior_edges: dict[tuple[str, str], dict] = {} + for src in present_ids: + for tgt in present_ids: + if src == tgt: + continue + edge = get_edge(conn, src, tgt) or { + "affinity": 50, + "trust": 50, + "summary": "", + } + prior_edges[(src, tgt)] = edge + return prior_edges + + +__all__ = ["read_recent_dialogue", "gather_prior_edges"] diff --git a/chat/state/memory.py b/chat/state/memory.py index 0eda418..5310965 100644 --- a/chat/state/memory.py +++ b/chat/state/memory.py @@ -125,6 +125,16 @@ def search_memories( so that stronger candidates yield smaller composite scores; the result is sorted ascending and truncated to ``k``. The unmodified ``fts_rank`` and a debug-friendly ``composite_score`` are kept on each returned dict. + + The result ordering applies TWO independent significance boosts: + + * **SQL-side** — ``ORDER BY (rank - significance * SIGNIFICANCE_RANK_BIAS)`` + pushes higher-significance memories ahead in the FTS5 candidate set so + the over-fetch already prefers them for tied / near-tied BM25 ranks + (T57, §11.1). + * **Python-side** — a composite re-rank with ``_SIGNIFICANCE_WEIGHT`` + reinforces the ordering after candidate retrieval, alongside the + recency boost above. """ if witness_role not in _VALID_WITNESS_ROLES: raise ValueError( diff --git a/chat/templates/chat.html b/chat/templates/chat.html index c2f622c..7c27470 100644 --- a/chat/templates/chat.html +++ b/chat/templates/chat.html @@ -17,7 +17,7 @@

No turns yet. Start typing below.

{% else %} {% for turn in turns %} -
+ {{ turn.speaker }} {{ turn.text|render_prose|safe }}
@@ -119,6 +119,39 @@ document.querySelector('.drawer-toggle')?.addEventListener('click', (e) => { } }); + // T86: live-swap regenerated turns. The backend (chat/services/ + // regenerate.py) broadcasts a ``turn_html_replace`` SSE frame after + // appending the new assistant_turn — JSON payload of shape + // ``{data: , turn_id: , supersedes_id: }``. + // We replace the prior turn's DOM node in-place when we can locate + // it by id, otherwise fall back to appending so a tab opened mid- + // regenerate still shows the new turn. The renderer + // (chat/web/render.py::render_turn_html) and the Jinja loop above + // both stamp ``id="turn-"`` on each turn DIV, so the + // primary in-place swap path is the live one — the append fallback + // only kicks in when a tab opened AFTER the regenerate started (no + // prior turn DOM node to replace). + shell.addEventListener('htmx:sseMessage', (e) => { + if (e.detail.type !== 'turn_html_replace') return; + let data; + try { data = JSON.parse(e.detail.data); } catch (_) { return; } + const html = (data && data.data) || ''; + const trimmed = html.trim(); + if (!trimmed) return; + const oldNode = document.getElementById('turn-' + data.supersedes_id); + if (oldNode) { + const tmpl = document.createElement('template'); + tmpl.innerHTML = trimmed; + const newNode = tmpl.content.firstChild; + if (newNode) oldNode.replaceWith(newNode); + } else { + // Fallback: append if the prior turn isn't in the DOM (e.g. user + // opened the tab AFTER the regenerate started, or the renderer + // hasn't yet stamped per-turn ids — see comment above). + timeline.insertAdjacentHTML('beforeend', trimmed); + } + }); + // SSE connection lost — show a banner and unlock so the user can // retry. The server commits the partial as truncated when its // request.is_disconnected() poll trips (T19). diff --git a/chat/web/chat.py b/chat/web/chat.py index 0486d8e..fea524f 100644 --- a/chat/web/chat.py +++ b/chat/web/chat.py @@ -52,12 +52,30 @@ async def chat_detail(chat_id: str, request: Request, conn=Depends(get_conn)): raw_turns = _read_recent_dialogue(conn, chat_id, limit=200) turns: list[dict] = [] for t in raw_turns: + # event_id is forwarded so the Jinja loop can stamp + # ``id="turn-"`` on each rendered turn — the + # ``turn_html_replace`` SSE handler in chat.html relies on this + # id to swap a regenerated turn in-place (T86 follow-up). if t["speaker"] == "you": - turns.append({"role": "you", "speaker": "you", "text": t["text"]}) + turns.append( + { + "role": "you", + "speaker": "you", + "text": t["text"], + "event_id": t.get("event_id"), + } + ) else: bot = get_bot(conn, t["speaker"]) label = bot["name"] if bot else t["speaker"] - turns.append({"role": "bot", "speaker": label, "text": t["text"]}) + turns.append( + { + "role": "bot", + "speaker": label, + "text": t["text"], + "event_id": t.get("event_id"), + } + ) return TEMPLATES.TemplateResponse( request, diff --git a/chat/web/drawer.py b/chat/web/drawer.py index 1098eb5..bcfdc0d 100644 --- a/chat/web/drawer.py +++ b/chat/web/drawer.py @@ -48,6 +48,7 @@ from chat.state.world import active_scene, get_activity, get_chat, get_container from chat.web.bots import get_conn from chat.web.kickoff import get_llm_client from chat.web.skip import ( + ChatNotFoundError, _now_iso, process_elision_skip, process_jump_skip, @@ -993,13 +994,12 @@ async def skip_elision( new_time=new_time, landing_state_hint=landing_state_hint, ) + except ChatNotFoundError as exc: + # Missing chat row: typed exception (T81) replaces the prior + # ``str(exc).startswith("chat not found")`` prefix sniff. + raise HTTPException(status_code=404, detail=str(exc)) except ValueError as exc: - # ``process_elision_skip`` raises on missing-chat or malformed / - # backwards new_time. The drawer used to 404 / 400 these - # separately — preserve the 404-vs-400 split by sniffing the - # error message so existing tests keep passing without changes. - if str(exc).startswith("chat not found"): - raise HTTPException(status_code=404, detail=str(exc)) + # Input-validation failure (malformed or backwards new_time). raise HTTPException(status_code=400, detail=str(exc)) return await drawer(chat_id, request, conn) @@ -1037,9 +1037,12 @@ async def skip_jump( notable_prose=notable_prose, reset_activity=reset_flag, ) + except ChatNotFoundError as exc: + # Missing chat row: typed exception (T81) replaces the prior + # ``str(exc).startswith("chat not found")`` prefix sniff. + raise HTTPException(status_code=404, detail=str(exc)) except ValueError as exc: - if str(exc).startswith("chat not found"): - raise HTTPException(status_code=404, detail=str(exc)) + # Input-validation failure (malformed or backwards new_time). raise HTTPException(status_code=400, detail=str(exc)) return await drawer(chat_id, request, conn) diff --git a/chat/web/meanwhile.py b/chat/web/meanwhile.py index 1b04a73..5c46b3e 100644 --- a/chat/web/meanwhile.py +++ b/chat/web/meanwhile.py @@ -378,7 +378,12 @@ async def process_meanwhile_turn( "truncated": truncated, }, ) - turn_html = _render_turn_html(speaker_bot["name"], text, role="bot") + turn_html = _render_turn_html( + speaker_bot["name"], + text, + role="bot", + event_id=assistant_event_id, + ) await publish(chat_id, {"event": "turn_html", "data": turn_html}) if cancelled: diff --git a/chat/web/render.py b/chat/web/render.py index 6a2a286..f39913b 100644 --- a/chat/web/render.py +++ b/chat/web/render.py @@ -84,7 +84,13 @@ def render_prose(text: str) -> str: return "".join(f"

{p}

" for p in paragraphs) -def render_turn_html(speaker: str, text: str, role: str = "bot") -> str: +def render_turn_html( + speaker: str, + text: str, + role: str = "bot", + *, + event_id: int | None = None, +) -> str: """Render a full transcript turn as ``
``. Used by both the SSE fragment publisher in :mod:`chat.web.turns` @@ -94,12 +100,19 @@ def render_turn_html(speaker: str, text: str, role: str = "bot") -> str: ``role`` selects the CSS class (``turn-you`` vs ``turn-bot``); the speaker label and role name are HTML-escaped defensively even though they currently come from trusted server-side state. + + ``event_id`` (T86 follow-up) stamps ``id="turn-"`` on the + wrapper div so the chat-page ``turn_html_replace`` SSE handler can + locate the prior turn node by id and swap it in-place. When omitted + the id attribute is dropped so SSE-only fragments without a stable + event id (legacy callers) still render cleanly. """ speaker_html = html.escape(speaker) role_html = html.escape(role) body_html = render_prose(text) + id_attr = f' id="turn-{int(event_id)}"' if event_id is not None else "" return ( - f'
' + f'' f"{speaker_html}" f"{body_html}" f"
" diff --git a/chat/web/skip.py b/chat/web/skip.py index ccbf470..b6aa179 100644 --- a/chat/web/skip.py +++ b/chat/web/skip.py @@ -36,6 +36,17 @@ from chat.state.entities import get_bot, get_you from chat.state.world import get_activity, get_chat +class ChatNotFoundError(Exception): + """Raised when a ``chat_id`` doesn't resolve to a chat row. + + Distinguishes the missing-chat case from generic input-validation + failures (which still raise :class:`ValueError`). HTTP callers map + this to ``404`` and ``ValueError`` to ``400`` — replacing the + earlier ``str(exc).startswith("chat not found")`` prefix sniff + (T81) with a typed dispatch. + """ + + def _parse_iso_time(value: str) -> datetime | None: """Permissive ISO 8601 parser shared with the drawer routes (T59). @@ -93,13 +104,14 @@ async def process_elision_skip( ..., "assistant_event_id": ...}`` so callers can introspect the generated turn (e.g. for SSE rebroadcast or test assertions). - Raises ``ValueError`` on validation failure or when the chat row - can't be located (the drawer maps it to ``HTTP 400`` / ``404`` - respectively; the natural-language path follows the same shape). + Raises :class:`ChatNotFoundError` when the chat row is missing + (HTTP ``404``) and ``ValueError`` on input-validation failure + (HTTP ``400``). Splitting the two lets the drawer route dispatch + on type instead of sniffing the error string (T81). """ chat = get_chat(conn, chat_id) if chat is None: - raise ValueError(f"chat not found: {chat_id}") + raise ChatNotFoundError(f"chat not found: {chat_id}") _validate_new_time(chat, new_time) @@ -178,11 +190,13 @@ async def process_jump_skip( Returns ``{"assistant_text": ..., "speaker_id": ..., "skip_event_id": ..., "assistant_event_id": ...}``. - Raises ``ValueError`` on validation failure (caller maps to ``400``). + Raises :class:`ChatNotFoundError` on missing chat (caller maps to + ``404``) and ``ValueError`` on input-validation failure (caller maps + to ``400``). """ chat = get_chat(conn, chat_id) if chat is None: - raise ValueError(f"chat not found: {chat_id}") + raise ChatNotFoundError(f"chat not found: {chat_id}") _validate_new_time(chat, new_time) @@ -280,6 +294,7 @@ def _now_iso() -> str: __all__ = [ + "ChatNotFoundError", "process_elision_skip", "process_jump_skip", "_now_iso", diff --git a/chat/web/turns.py b/chat/web/turns.py index 3fc51d4..94f46d4 100644 --- a/chat/web/turns.py +++ b/chat/web/turns.py @@ -64,10 +64,17 @@ from chat.services.event_promotion import promote_completed_event from chat.services.interjection import detect_interjection from chat.services.memory_write import record_turn_memory_for_present from chat.services.multi_state_update import compute_state_updates_for_present -from chat.services.prompt import assemble_narrative_prompt +from chat.services.prompt import ( + assemble_narrative_prompt, + consume_pending_meanwhile_digests, +) from chat.services.rewind import compute_rewind_preview, execute_rewind from chat.services.scene_close import detect_scene_close from chat.services.scene_summarize import apply_scene_close_summary +from chat.services.turn_common import ( + gather_prior_edges, + read_recent_dialogue, +) from chat.services.turn_parse import ParsedTurn, parse_turn from chat.state.edges import get_edge from chat.state.entities import get_bot, get_you @@ -79,7 +86,11 @@ from chat.web.kickoff import get_llm_client from chat.web.meanwhile import process_meanwhile_turn from chat.web.pubsub import publish from chat.web.render import render_turn_html as _render_turn_html -from chat.web.skip import _parse_iso_time, process_elision_skip +from chat.web.skip import ( + ChatNotFoundError, + _parse_iso_time, + process_elision_skip, +) router = APIRouter() @@ -106,38 +117,13 @@ def _strip_ooc_for_prompt(parsed: ParsedTurn) -> str: def _read_recent_dialogue(conn, chat_id: str, limit: int = 200) -> list[dict]: """Return user-side and assistant_turn events for ``chat_id``. - Includes ``user_turn``, ``user_turn_edit`` (T29 edited prose), and - ``assistant_turn``. Ordered oldest-first; superseded/hidden rows are - skipped so regenerated turns (T29) drop out of the rendered timeline. - Each entry is shaped ``{"speaker": , "text": }`` - for the prompt assembler and the chat-detail template. + T83.2: thin delegate over + :func:`chat.services.turn_common.read_recent_dialogue` so post_turn + and regenerate share one implementation. The wrapper survives so + the chat-detail template and other callers in this module don't all + have to update at once. """ - cur = conn.execute( - "SELECT id, kind, payload_json FROM event_log " - "WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') " - " AND superseded_by IS NULL AND hidden = 0 " - "ORDER BY id DESC LIMIT ?", - (limit,), - ) - rows = cur.fetchall() - rows.reverse() # back to chronological order - out: list[dict] = [] - for _row_id, kind, payload_json in rows: - p = json.loads(payload_json) - if p.get("chat_id") != chat_id: - continue - if kind in ("user_turn", "user_turn_edit"): - # Edited prose substitutes for the original user_turn (the - # original is marked superseded_by and filtered above). - out.append({"speaker": "you", "text": p.get("prose", "")}) - else: - out.append( - { - "speaker": p.get("speaker_id", "bot"), - "text": p.get("text", ""), - } - ) - return out + return read_recent_dialogue(conn, chat_id, limit=limit) def _detect_addressee_id( @@ -204,17 +190,8 @@ def _gather_state_update_inputs( present_names[guest_bot["id"]] = guest_bot["name"] personas[guest_bot["id"]] = guest_bot.get("persona") or "" - prior_edges: dict[tuple[str, str], dict] = {} - for src in present_ids: - for tgt in present_ids: - if src == tgt: - continue - edge = get_edge(conn, src, tgt) or { - "affinity": 50, - "trust": 50, - "summary": "", - } - prior_edges[(src, tgt)] = edge + # T83.2: directed-edge gather is shared with regenerate.py. + prior_edges = gather_prior_edges(conn, present_ids) return present_ids, present_names, personas, prior_edges @@ -310,6 +287,49 @@ async def post_turn( ) if intent == "skip_elision": + # T82.2: run scene-close detection on the user's prose BEFORE + # the skip controller fires. Prose like "fade out, skip an hour" + # carries both a close signal and a skip directive; we want the + # close summary to capture the closing scene's final beat (and + # promote per-POV memories) before the time advances. Order + # matters: scene close -> skip narration -> time advance. + # + # When there's no active scene (or the prose carries no close + # signal) ``detect_scene_close`` returns the safe + # ``should_close=False`` default and we drop straight to the + # skip controller — same behavior as today, no extra cost. + skip_scene = active_scene(conn, chat_id) + if skip_scene is not None: + container = None + if skip_scene.get("container_id") is not None: + container = get_container(conn, skip_scene["container_id"]) + container_name = container["name"] if container else "unknown" + close_decision = await detect_scene_close( + client, + model=settings.classifier_model, + prose=prose, + current_container_name=container_name, + ) + if close_decision.should_close: + append_and_apply( + conn, + kind="scene_closed", + payload={ + "scene_id": skip_scene["id"], + "ended_at": chat.get("time"), + "significance": 0, + }, + ) + await apply_scene_close_summary( + conn, + client, + classifier_model=settings.classifier_model, + chat_id=chat_id, + scene_id=skip_scene["id"], + host_bot_id=host_bot["id"], + timeout_s=settings.classifier_timeout_s, + ) + # Derive ``new_time`` from the chat clock. Phase 3 stub: bump by # 1 hour. The drawer's elision form is the structured path when # the author wants a specific landing time; here the goal is @@ -333,11 +353,15 @@ async def post_turn( landing_state_hint=getattr(parsed, "landing_state_hint", "") or "", ) + except ChatNotFoundError as exc: + # Defensive: chat existence is checked above, so this only + # fires on a TOCTOU race where the chat row is deleted + # mid-request. T81 split the typed missing-chat case out of + # the generic ValueError so we keep the 404 mapping here. + raise HTTPException(status_code=404, detail=str(exc)) except ValueError as exc: - # The controller raises on missing chat / bad new_time. - # Missing chat is already handled above (we'd have 404'd); - # a bad new_time here is a stub-derivation bug rather than - # user input — surface as 400 with the controller message. + # Bad new_time is a stub-derivation bug rather than user + # input — surface as 400 with the controller message. raise HTTPException(status_code=400, detail=str(exc)) return Response(status_code=204) @@ -459,7 +483,11 @@ async def post_turn( # 7. Append the assistant_turn with the final text. (See note above on # why we skip ``project`` for these transcript-only event kinds.) - append_event( + # Capture the returned event id so we can stamp ``id="turn-"`` on + # the SSE-emitted HTML fragment — the chat-page ``turn_html_replace`` + # handler relies on the id to swap regenerated turns in-place + # (T86 follow-up). + primary_assistant_event_id = append_event( conn, kind="assistant_turn", payload={ @@ -559,6 +587,7 @@ async def post_turn( interjection_text: str | None = None interjection_speaker_id: str | None = None interjection_truncated = False + interjection_event_id: int | None = None if ( guest_bot is not None and not cancelled @@ -646,7 +675,9 @@ async def post_turn( interjection_text = "".join(interject_accumulated) - append_event( + # Capture the event id (T86 follow-up) so the SSE fragment + # below carries ``id="turn-"`` for in-place swap. + interjection_event_id = append_event( conn, kind="assistant_turn", payload={ @@ -878,6 +909,15 @@ async def post_turn( timeout_s=settings.classifier_timeout_s, ) + # 9a. Consume any pending meanwhile digests now that the assistant_turn + # (which surfaced them in its prompt via T65's helper) has landed. The + # spec's "first you-turn AFTER meanwhile close consumes the digest" + # semantics are preserved by running this AFTER scene-close detection + # — anything pending right now belongs to the prompt we just answered, + # so it's safe to mark consumed and the NEXT turn starts clean. + # Idempotent: re-calling produces zero events when nothing's pending. + consume_pending_meanwhile_digests(conn, chat_id) + # 10. Broadcast a JSON completion event (for JS consumers) and an HTML # fragment event (for HTMX SSE swap-into-timeline). One pair per # written assistant_turn so the timeline ends up with both the @@ -892,7 +932,10 @@ async def post_turn( }, ) primary_html = _render_turn_html( - addressee_bot["name"], primary_text, role="bot" + addressee_bot["name"], + primary_text, + role="bot", + event_id=primary_assistant_event_id, ) await publish( chat_id, {"event": "turn_html", "data": primary_html} @@ -916,7 +959,10 @@ async def post_turn( }, ) interject_html = _render_turn_html( - interject_speaker_name, interjection_text, role="bot" + interject_speaker_name, + interjection_text, + role="bot", + event_id=interjection_event_id, ) await publish( chat_id, {"event": "turn_html", "data": interject_html} diff --git a/tests/test_addressee.py b/tests/test_addressee.py index 71954cf..bb90d1b 100644 --- a/tests/test_addressee.py +++ b/tests/test_addressee.py @@ -97,3 +97,38 @@ async def test_classifier_failure_falls_back_to_host(): assert result.addressee_id == "bot_a" assert result.reason == "fallback" assert result.confidence == "low" + + +@pytest.mark.asyncio +async def test_invalid_confidence_value_falls_back_to_default(): + """Pydantic rejects ``confidence`` values outside the literal set + (``high`` / ``medium`` / ``low``). After the retry budget is + exhausted, classify returns the configured fallback default — + here that's ``confidence="low"`` with ``reason="fallback"``. + """ + canned = [ + json.dumps( + { + "addressee_id": "bot_a", + "confidence": "VERY_HIGH", + "reason": "out-of-range value", + } + ), + "still_bad", + "still_bad", + ] + client = MockLLMClient(canned=canned) + + result = await detect_addressee( + client, + classifier_model="test-model", + user_prose="anything", + host_id="bot_a", + host_name="BotA", + guest_id="bot_b", + guest_name="BotB", + ) + + assert result.addressee_id == "bot_a" + assert result.confidence == "low" + assert result.reason == "fallback" diff --git a/tests/test_drawer_events_threads_skip.py b/tests/test_drawer_events_threads_skip.py index ab1dafd..a621c2b 100644 --- a/tests/test_drawer_events_threads_skip.py +++ b/tests/test_drawer_events_threads_skip.py @@ -273,6 +273,43 @@ def test_post_skip_elision_advances_clock_and_emits_narration(client, tmp_path): assert tp["speaker_id"] == "bot_a" +def test_skip_route_404_via_typed_exception_class(client, tmp_path): + """T81: drawer skip routes 404 via :class:`ChatNotFoundError`. + + Pre-T81, the route caught ``ValueError`` and recovered the 404 case + by sniffing ``str(exc).startswith("chat not found")`` — fragile if + the message ever changed wording. The controller now raises a typed + exception so the route dispatches on type. Asserting the 404 from + the unseeded chat exercises the typed branch end-to-end; importing + the class confirms it's a real subclass of ``Exception`` and not a + re-export of ``ValueError`` (which would defeat the type split). + """ + # Don't seed any chat — the controller hits ``get_chat`` returning + # ``None`` and raises ``ChatNotFoundError``. The drawer route then + # maps that to ``404`` via the typed handler (no string sniff). + _override_llm([]) + try: + response = client.post( + "/chats/nonexistent/drawer/skip/elision", + data={ + "landing_state_hint": "x", + "new_time": "2026-04-26T20:30:00+00:00", + }, + ) + assert response.status_code == 404 + finally: + app.dependency_overrides.clear() + + # The exception class itself is importable, distinct from ValueError, + # and a proper Exception subclass — pinning the type-based dispatch + # so future refactors can't quietly collapse it back to a string sniff. + from chat.web.skip import ChatNotFoundError + + assert ChatNotFoundError is not None + assert issubclass(ChatNotFoundError, Exception) + assert not issubclass(ChatNotFoundError, ValueError) + + def test_post_skip_elision_invalid_time_returns_400(client, tmp_path): _seed_chat(tmp_path / "test.db") _override_llm([]) diff --git a/tests/test_meanwhile_turn_flow.py b/tests/test_meanwhile_turn_flow.py index 0290b2b..9549813 100644 --- a/tests/test_meanwhile_turn_flow.py +++ b/tests/test_meanwhile_turn_flow.py @@ -570,3 +570,173 @@ def test_meanwhile_turn_registered_in_in_flight_tasks( # Post-flight: the entry has been cleaned up so the next turn (or # the cancel route) doesn't see a stale task. assert "chat_bot_a" not in _in_flight_tasks + + +def test_meanwhile_turn_cancellation_via_route(app_state_setup, tmp_path): + """T85.2: a cancellation that fires while a meanwhile beat is + streaming truncates the assistant_turn and skips the post-turn + memory + state-update writes — the same end-to-end shape the + /turns/cancel route produces. + + Drives the cancel by hijacking ``client.stream`` to raise + CancelledError on its first iteration — the exact pattern proven + by ``test_cancelled_turn_still_closes_scene_when_user_prose_signals_close`` + in ``tests/test_turn_flow.py``. This mirrors what + ``cancel_turn`` does in production (``task.cancel()`` schedules a + CancelledError on the next await); doing the raise inline avoids + the TestClient-loop-reentry problem that prevents driving a second + POST mid-stream from the same synchronous test thread, while + exercising the same code path: the meanwhile streamer's + ``except asyncio.CancelledError`` block at meanwhile.py:276 sets + ``cancelled=True`` + ``truncated=True``, the assistant_turn lands + with the partial, and the memory/state-update branch is skipped. + + The ``_in_flight_tasks`` registration that wires the cancel route + to the meanwhile streamer is independently pinned by + ``test_meanwhile_turn_registered_in_in_flight_tasks`` above; this + test pins the downstream behavioural shape the registration + enables — together they cover the full Stop-button lifecycle for + meanwhile beats. + + Behavioural pins: + + * ``assistant_turn`` lands with ``truncated=True``, + ``meanwhile_scene_id=2``, ``speaker_id="bot_a"``. + * No ``memory_written`` events fire (cancel skips per-bot writes). + * No post-turn ``edge_update`` events fire (cancel skips state updates). + * ``_in_flight_tasks`` is empty post-flight. + """ + from typing import AsyncIterator, Sequence + + from chat.llm.client import Message + from chat.web.turns import _in_flight_tasks + + _seed_meanwhile_chat(tmp_path / "test.db") + + class _CancelOnStreamMock(MockLLMClient): + """Yields CancelledError on first iteration of ``stream`` — + simulates ``cancel_turn`` having fired ``task.cancel()`` on the + in-flight streaming task. ``generate`` is delegated to the + canned-queue base so parse_turn still resolves cleanly. + """ + + async def stream( + self, messages: Sequence[Message], *, model: str, **params + ) -> AsyncIterator[str]: + raise asyncio.CancelledError + yield # pragma: no cover — keeps this an async generator. + + canned_parse = json.dumps( + {"segments": [{"kind": "narration", "text": "they exchange a glance"}]} + ) + # Canned queue: only parse_turn — the narrative slot is never pulled + # because stream raises before consuming it, and post-turn + # state-update is skipped by the cancel branch. + mock = _CancelOnStreamMock(canned=[canned_parse]) + from chat.web.kickoff import get_llm_client + + app.dependency_overrides[get_llm_client] = lambda: mock + try: + # The meanwhile controller re-raises CancelledError after the + # partial assistant_turn is recorded (meanwhile.py:387). The + # outer post_turn route has no catch for CancelledError on the + # meanwhile path (turns.py:244-254 only catches ValueError), so + # the exception propagates up through Starlette. TestClient + # surfaces that as a 500 or a propagated exception depending on + # Starlette/asyncio versions; we don't pin the response. + try: + app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "they exchange a glance"}, + ) + except BaseException: + pass + finally: + app.dependency_overrides.clear() + + with open_db(tmp_path / "test.db") as conn: + assistant_rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'assistant_turn' ORDER BY id" + ).fetchall() + memory_count = conn.execute( + "SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'" + ).fetchone()[0] + # Edge updates AFTER the assistant_turn (i.e. excluding seeded ones). + max_at_row = conn.execute( + "SELECT MAX(id) FROM event_log WHERE kind = 'assistant_turn'" + ).fetchone() + max_at = max_at_row[0] if max_at_row[0] is not None else 0 + post_turn_edge_updates = conn.execute( + "SELECT COUNT(*) FROM event_log " + "WHERE kind = 'edge_update' AND id > ?", + (max_at,), + ).fetchone()[0] + + # The cancelled assistant_turn was still recorded with truncated=True, + # carrying whatever partial text accumulated before cancel propagated + # (zero text here since the cancel hits on the first iteration). + assert len(assistant_rows) == 1 + payload = json.loads(assistant_rows[0][0]) + assert payload["truncated"] is True, payload + assert payload["meanwhile_scene_id"] == 2 + assert payload["speaker_id"] == "bot_a" + + # No per-bot memory writes — cancellation short-circuits the memory + # + state-update branch (see chat/web/meanwhile.py:308). + assert memory_count == 0 + + # No post-turn edge_updates — same short-circuit. + assert post_turn_edge_updates == 0 + + # Post-flight: registry cleared so the cancel route won't try to + # re-cancel a defunct task on a follow-up POST. + assert "chat_bot_a" not in _in_flight_tasks + + +def test_meanwhile_cancel_route_no_op_after_turn_completes( + app_state_setup, tmp_path +): + """T85.2: POST ``/chats//turns/cancel`` AFTER a meanwhile turn + has fully completed is a silent 204 no-op — there is no in-flight + task to cancel, the registry is empty, and the route must not error. + + Pins the cancel endpoint's robustness against the common-but-racy + sequence where the user clicks Stop just after the stream finished + (the SSE channel hasn't yet flipped the client-side ``isStreaming`` + flag). This is a complement to the snapshot test: the snapshot test + pins that the registry IS populated mid-flight, this test pins that + it isn't AFTER and that the route copes gracefully. + """ + from chat.web.turns import _in_flight_tasks + + _seed_meanwhile_chat(tmp_path / "test.db") + canned_parse = json.dumps( + {"segments": [{"kind": "narration", "text": "they exchange a glance"}]} + ) + canned = [ + canned_parse, + "BotA leans in. *quietly*", + _zero_state(), + _zero_state(), + ] + mock = _override_llm(canned) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "they exchange a glance"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + # Registry was cleaned up after the stream completed. + assert "chat_bot_a" not in _in_flight_tasks + + # Cancel after-the-fact: 204, no error, registry stays empty. + cancel_response = app_state_setup.post( + "/chats/chat_bot_a/turns/cancel" + ) + assert cancel_response.status_code == 204 + assert "chat_bot_a" not in _in_flight_tasks diff --git a/tests/test_memory_write.py b/tests/test_memory_write.py index 00243b0..77132ae 100644 --- a/tests/test_memory_write.py +++ b/tests/test_memory_write.py @@ -444,3 +444,91 @@ def test_record_for_present_dict_keys_match(tmp_path): narrative_text="Both bots witness this.", ) assert set(result_with_guest.keys()) == {"bot_a", "bot_b"} + + +# --------------------------------------------------------------------------- +# T84: unified record_turn_memory_for_present API with you_present kwarg. +# --------------------------------------------------------------------------- + + +def test_record_turn_memory_you_present_false_writes_meanwhile_witness_mask(tmp_path): + """When ``you_present=False`` the witness mask should be + ``[you=0, host=1, guest=1]`` for both bots — the meanwhile shape.""" + db = tmp_path / "t.db" + apply_migrations(db) + _seed_two_bots(db) + with open_db(db) as conn: + result = record_turn_memory_for_present( + conn, + chat_id="chat_ab", + host_bot_id="bot_a", + guest_bot_id="bot_b", + narrative_text="BotA and BotB confer privately.", + scene_id=None, + chat_clock_at="2026-04-26T20:00:00+00:00", + you_present=False, + ) + + assert set(result.keys()) == {"bot_a", "bot_b"} + + rows = conn.execute( + "SELECT owner_id, witness_you, witness_host, witness_guest " + "FROM memories ORDER BY owner_id" + ).fetchall() + assert len(rows) == 2 + for _owner, w_you, w_host, w_guest in rows: + assert w_you == 0 + assert w_host == 1 + assert w_guest == 1 + + # Two memory_written events were appended. + cur = conn.execute( + "SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'" + ) + assert cur.fetchone()[0] == 2 + + +def test_record_turn_memory_you_present_true_default_writes_normal_witness_mask(tmp_path): + """Default ``you_present=True`` preserves Phase 2 behaviour: + ``witness_you=1`` for the host POV row.""" + db = tmp_path / "t.db" + apply_migrations(db) + _seed_minimal(db) + with open_db(db) as conn: + # No explicit you_present arg — should default to True. + result = record_turn_memory_for_present( + conn, + chat_id="chat_bot_a", + host_bot_id="bot_a", + guest_bot_id=None, + narrative_text="BotA hums to herself.", + ) + assert set(result.keys()) == {"bot_a"} + + row = conn.execute( + "SELECT witness_you, witness_host, witness_guest " + "FROM memories WHERE owner_id = 'bot_a'" + ).fetchone() + assert row is not None + w_you, w_host, w_guest = row + assert w_you == 1 + assert w_host == 1 + assert w_guest == 0 + + +def test_record_turn_memory_you_present_false_requires_guest(tmp_path): + """Calling with ``you_present=False`` and no ``guest_bot_id`` is a + programming error — meanwhile scenes always have both bots.""" + db = tmp_path / "t.db" + apply_migrations(db) + _seed_minimal(db) + with open_db(db) as conn: + with pytest.raises(ValueError, match="you_present=False requires guest_bot_id"): + record_turn_memory_for_present( + conn, + chat_id="chat_bot_a", + host_bot_id="bot_a", + guest_bot_id=None, + narrative_text="invalid", + you_present=False, + ) diff --git a/tests/test_per_pov_summary.py b/tests/test_per_pov_summary.py index 6984b5c..3ed22ac 100644 --- a/tests/test_per_pov_summary.py +++ b/tests/test_per_pov_summary.py @@ -1418,3 +1418,528 @@ def test_consumed_digest_does_not_render_again(tmp_path): body2 = msgs2[0].content assert "Meanwhile while you were away:" not in body2 assert digest_text not in body2 + + +# --------------------------------------------------------------------------- +# T80: scene_summarize polish bundle. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_scene_close_re_run_does_not_double_suffix(tmp_path): + """T80.1: re-running ``apply_scene_close_summary`` on the same scene + must NOT stack a second "Key quotes:" suffix on each pov_summary. The + builder strips any existing suffix from candidate text before + composing the new one, and the per-POV write replaces (not appends + to) the existing suffix. + """ + db = tmp_path / "t.db" + apply_migrations(db) + canned = json.dumps( + { + "summary": "BotA had a heavy talk with you.", + "knowledge_facts": [], + "relationship_summary": "Things shifted.", + } + ) + no_threads = json.dumps({"candidates": []}) + with open_db(db) as conn: + _seed_single_bot_scene_no_memory(conn) + # Significance >= 2 triggers the Key quotes suffix path. + _seed_memory(conn, pov_summary="Maya quote one", significance=3) + _seed_memory(conn, pov_summary="Maya quote two", significance=2) + project(conn) + + # First close. + client = MockLLMClient(canned=[canned, no_threads]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + rows = conn.execute( + "SELECT pov_summary FROM memories WHERE scene_id = 1" + ).fetchall() + assert rows + for (pov,) in rows: + assert pov.count("Key quotes:") == 1 + + # Second close on the same scene with fresh canned responses. + client2 = MockLLMClient(canned=[canned, no_threads]) + await apply_scene_close_summary( + conn, + client2, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + rows2 = conn.execute( + "SELECT pov_summary FROM memories WHERE scene_id = 1" + ).fetchall() + assert rows2 + for (pov,) in rows2: + # Still exactly ONE "Key quotes:" suffix — no recursive bloat. + assert pov.count("Key quotes:") == 1 + # And no nested-quote artifacts (the suffix wasn't sourced + # from a row whose text already contained the suffix). + inner_count = pov.count("Key quotes:") + assert inner_count == 1 + + +@pytest.mark.asyncio +async def test_thread_detection_uses_scene_scoped_transcript( + tmp_path, monkeypatch +): + """T80.2: when a chat has multiple closed scenes, the second scene's + close must hand ``detect_threads`` ONLY the second scene's turns — + not the chat-wide last-50, which would bleed in the first scene's + transcript and risk mis-closing threads.""" + from chat.services import thread_detection as td_mod + + canned = json.dumps( + { + "summary": "BotA had a quick chat.", + "knowledge_facts": [], + "relationship_summary": "Steady.", + } + ) + + captured_transcripts: list[list[dict]] = [] + + async def capturing_detect_threads(client, **kwargs): + captured_transcripts.append(list(kwargs["scene_transcript"])) + return td_mod.ThreadDetectionResult() + + monkeypatch.setattr(td_mod, "detect_threads", capturing_detect_threads) + + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + # Seed scene 1 + 3 turns + close. + _seed_single_bot_scene(conn) + # Add two extra distinct turns inside scene 1 so the transcript + # has clearly-scene-1 markers we can assert on. + append_event( + conn, + kind="user_turn", + payload={ + "chat_id": "chat_bot_a", + "prose": "SCENE_ONE_USER_TURN", + "segments": [], + }, + ) + append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_bot_a", + "speaker_id": "bot_a", + "text": "SCENE_ONE_BOT_TURN", + "truncated": False, + "user_turn_id": 2, + }, + ) + project(conn) + + # Close scene 1. + client = MockLLMClient(canned=[canned]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + # Open scene 2 with distinct dialogue. Use append_and_apply so + # the new events project incrementally without re-running the + # already-applied seed events. + from chat.eventlog.log import append_and_apply + + append_and_apply( + conn, + kind="scene_opened", + payload={ + "chat_id": "chat_bot_a", + "container_id": 1, + "started_at": "2026-04-26T21:00:00+00:00", + "participants": ["you", "bot_a"], + }, + ) + append_and_apply( + conn, + kind="memory_written", + payload={ + "owner_id": "bot_a", + "chat_id": "chat_bot_a", + "scene_id": 2, + "pov_summary": "Original (scene 2)", + "witness_you": 1, + "witness_host": 1, + "witness_guest": 0, + "significance": 1, + }, + ) + append_and_apply( + conn, + kind="user_turn", + payload={ + "chat_id": "chat_bot_a", + "prose": "SCENE_TWO_USER_TURN", + "segments": [], + }, + ) + append_and_apply( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_bot_a", + "speaker_id": "bot_a", + "text": "SCENE_TWO_BOT_TURN", + "truncated": False, + "user_turn_id": 3, + }, + ) + + # Close scene 2. + client2 = MockLLMClient(canned=[canned]) + await apply_scene_close_summary( + conn, + client2, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=2, + host_bot_id="bot_a", + ) + + # The second close's transcript holds only scene-2 markers. + assert len(captured_transcripts) == 2 + scene_two_transcript = captured_transcripts[1] + joined = " ".join(t.get("text", "") for t in scene_two_transcript) + assert "SCENE_TWO" in joined + assert "SCENE_ONE" not in joined + + +@pytest.mark.asyncio +async def test_detect_threads_failure_is_logged(tmp_path, monkeypatch, caplog): + """T80.3: when ``detect_threads`` raises, the broad except must log + the failure at DEBUG so a programmer-error flap surfaces in local + logs even though the close pipeline keeps moving.""" + import logging + + from chat.services import thread_detection as td_mod + + canned = json.dumps( + { + "summary": "BotA had a quick chat.", + "knowledge_facts": [], + "relationship_summary": "Steady.", + } + ) + + async def boom(client, **kwargs): + raise RuntimeError("test-detect-threads-boom") + + monkeypatch.setattr(td_mod, "detect_threads", boom) + + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_single_bot_scene(conn) + project(conn) + + caplog.set_level(logging.DEBUG, logger="chat.services.scene_summarize") + client = MockLLMClient(canned=[canned]) + # Close should NOT raise even though detect_threads did. + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + # Log carries the error message. + assert any( + "detect_threads failed" in rec.message + and "test-detect-threads-boom" in rec.message + for rec in caplog.records + ), [r.message for r in caplog.records] + + +@pytest.mark.asyncio +async def test_thread_closed_uses_chat_clock_time(tmp_path, monkeypatch): + """T80.4: emitted ``thread_closed`` events stamp ``closed_at`` with + the chat-clock time (chat["time"]), not the host's wall clock. The + rest of the close pipeline already does this; threads must agree + so timeline reconstruction stays consistent.""" + from chat.services import thread_detection as td_mod + + canned = json.dumps( + { + "summary": "BotA had a quick chat.", + "knowledge_facts": [], + "relationship_summary": "Steady.", + } + ) + + async def fake_detect_threads(client, **kwargs): + return td_mod.ThreadDetectionResult( + candidates=[ + td_mod.ThreadCandidate( + action="close", + existing_thread_id="thr_x", + summary="resolved", + ), + ] + ) + + monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads) + + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_single_bot_scene(conn) + # Pre-seed an open thread so the "close" candidate has something + # real to close, and pin the chat clock to a known value. + from chat.eventlog.log import append_and_apply + import chat.state.threads # noqa: F401 + + append_and_apply( + conn, + kind="thread_opened", + payload={ + "thread_id": "thr_x", + "chat_id": "chat_bot_a", + "title": "Lingering question", + "summary": "What did Maya hide?", + }, + ) + project(conn) + # UPDATE chat_state AFTER project so the re-projection doesn't + # overwrite the pinned clock value. + chat_clock = "2026-04-26T10:00:00+00:00" + conn.execute( + "UPDATE chat_state SET time = ? WHERE chat_id = ?", + (chat_clock, "chat_bot_a"), + ) + + client = MockLLMClient(canned=[canned]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + rows = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'thread_closed'" + ).fetchall() + assert len(rows) == 1 + payload = json.loads(rows[0][0]) + assert payload["thread_id"] == "thr_x" + assert payload["closed_at"] == chat_clock + + +# --------------------------------------------------------------------------- +# T80.5: T58 coverage gaps (truncation, thread update/close emissions). +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_key_quote_truncation_at_200_chars(tmp_path): + """T80.5: when a memory's pov_summary exceeds 200 chars, the + Key-quote bullet truncates the source text to exactly 200 chars + (no ellipsis — a hard slice, per the existing T58 implementation).""" + db = tmp_path / "t.db" + apply_migrations(db) + canned = json.dumps( + { + "summary": "BotA had a heavy talk.", + "knowledge_facts": [], + "relationship_summary": "Things shifted.", + } + ) + no_threads = json.dumps({"candidates": []}) + long_text = "X" * 500 # 500 X's; expected slice is 200 X's. + with open_db(db) as conn: + _seed_single_bot_scene_no_memory(conn) + _seed_memory(conn, pov_summary=long_text, significance=2) + project(conn) + + client = MockLLMClient(canned=[canned, no_threads]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + new_pov = conn.execute( + "SELECT pov_summary FROM memories WHERE scene_id = 1" + ).fetchone()[0] + assert "Key quotes:" in new_pov + # The bullet should contain exactly 200 X's, not 500. + # Format from _build_key_quotes_suffix: ``- ""``. + bullet_marker = '- "' + idx = new_pov.index(bullet_marker) + # Count consecutive X's after the bullet marker. + x_run = 0 + for ch in new_pov[idx + len(bullet_marker):]: + if ch == "X": + x_run += 1 + else: + break + assert x_run == 200, ( + f"expected 200-char truncation, got {x_run}" + ) + + +@pytest.mark.asyncio +async def test_thread_detection_update_candidate_emits_thread_updated( + tmp_path, monkeypatch +): + """T80.5: a detect_threads ``update`` candidate produces a + ``thread_updated`` event with the candidate's summary and a + last_referenced_scene_id pointing at the closed scene.""" + from chat.services import thread_detection as td_mod + + canned = json.dumps( + { + "summary": "BotA had a quick chat.", + "knowledge_facts": [], + "relationship_summary": "Steady.", + } + ) + + async def fake_detect_threads(client, **kwargs): + return td_mod.ThreadDetectionResult( + candidates=[ + td_mod.ThreadCandidate( + action="update", + existing_thread_id="thr_x", + summary="updated summary", + ), + ] + ) + + monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads) + + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_single_bot_scene(conn) + from chat.eventlog.log import append_and_apply + import chat.state.threads # noqa: F401 + + # Pre-seed the open thread so the update has a row to target. + append_and_apply( + conn, + kind="thread_opened", + payload={ + "thread_id": "thr_x", + "chat_id": "chat_bot_a", + "title": "Lingering question", + "summary": "old summary", + }, + ) + project(conn) + + client = MockLLMClient(canned=[canned]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + rows = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'thread_updated'" + ).fetchall() + assert len(rows) == 1 + payload = json.loads(rows[0][0]) + assert payload["thread_id"] == "thr_x" + assert payload["summary"] == "updated summary" + assert payload["last_referenced_scene_id"] == 1 + + +@pytest.mark.asyncio +async def test_thread_detection_close_candidate_emits_thread_closed( + tmp_path, monkeypatch +): + """T80.5: a detect_threads ``close`` candidate produces a + ``thread_closed`` event for the existing thread.""" + from chat.services import thread_detection as td_mod + + canned = json.dumps( + { + "summary": "BotA had a quick chat.", + "knowledge_facts": [], + "relationship_summary": "Steady.", + } + ) + + async def fake_detect_threads(client, **kwargs): + return td_mod.ThreadDetectionResult( + candidates=[ + td_mod.ThreadCandidate( + action="close", + existing_thread_id="thr_x", + summary="resolved", + ), + ] + ) + + monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads) + + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_single_bot_scene(conn) + from chat.eventlog.log import append_and_apply + import chat.state.threads # noqa: F401 + + append_and_apply( + conn, + kind="thread_opened", + payload={ + "thread_id": "thr_x", + "chat_id": "chat_bot_a", + "title": "Lingering question", + "summary": "open", + }, + ) + project(conn) + + client = MockLLMClient(canned=[canned]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + rows = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'thread_closed'" + ).fetchall() + assert len(rows) == 1 + payload = json.loads(rows[0][0]) + assert payload["thread_id"] == "thr_x" + # closed_at field is present (T80.4 verifies its value). + assert "closed_at" in payload diff --git a/tests/test_phase3_integration.py b/tests/test_phase3_integration.py index 11a9bb0..f01eb16 100644 --- a/tests/test_phase3_integration.py +++ b/tests/test_phase3_integration.py @@ -39,11 +39,11 @@ Cross-feature notes discovered while writing these tests: swallowed. Tests that don't care about thread coverage can omit the slot; test 2 includes a valid thread response to exercise the path. - ``consume_pending_meanwhile_digests`` is defined in chat.services.prompt - but is NOT currently wired into the post_turn flow. The digest stays - pending across turns until the helper is called explicitly. Test 4 - reflects this: it asserts the digest renders pre-consumption AND - post-consumption (driven via the helper directly), and that the - meanwhile_digest_consumed event lands in the event_log. + and is wired into the END of post_turn (after scene-close detection) + by T82.1. Test 4 still drives the helper directly because it asserts + the helper's contract in isolation (no post_turn round-trip in scope); + the explicit call doubles as defensive coverage and is idempotent — a + second call on already-consumed digests is a no-op. - The host-only ``apply_scene_close_summary`` canned queue layout is ``[host_pov, thread_detection]`` (2 slots) when a single bot is present and there are dialogue rows, with thread_detection being optional / @@ -769,10 +769,11 @@ def test_meanwhile_close_digest_surfaces_then_consumed( — the digest is gone, and a meanwhile_digest_consumed event landed. Cross-feature finding: ``consume_pending_meanwhile_digests`` is - defined in chat.services.prompt but is NOT wired into the post_turn - flow. The digest stays pending across turns until callers invoke - the helper. Test exercises the helper directly so the consumption - contract is pinned independent of any future post_turn integration. + defined in chat.services.prompt and wired into post_turn by T82.1 + (after scene-close detection). This test exercises the helper + directly so the consumption contract is pinned in isolation from + the post_turn round-trip; T82.1's wiring is covered by a dedicated + test in tests/test_turn_flow.py. Canned queue for the meanwhile turn: 1. parse_turn diff --git a/tests/test_prompt.py b/tests/test_prompt.py index e8fc30d..471d5b4 100644 --- a/tests/test_prompt.py +++ b/tests/test_prompt.py @@ -21,7 +21,7 @@ import chat.state.world # noqa: F401 import chat.state.events # noqa: F401 import chat.state.threads # noqa: F401 from chat.llm.client import Message -from chat.services.prompt import assemble_narrative_prompt +from chat.services.prompt import _witness_role_for, assemble_narrative_prompt def _seed_basic(conn) -> None: @@ -852,3 +852,10 @@ def test_assemble_with_open_thread_renders_block(tmp_path): body = msgs[0].content assert "Open threads:" in body assert "Maya's job hunt" in body + + +def test_witness_role_for_none_host_returns_host(): + assert _witness_role_for("bot_a", None) == "host" + # Sanity check: existing semantics preserved. + assert _witness_role_for("bot_a", "bot_a") == "host" + assert _witness_role_for("bot_a", "bot_b") == "guest" diff --git a/tests/test_regenerate.py b/tests/test_regenerate.py index 7fa22bc..d8a2d65 100644 --- a/tests/test_regenerate.py +++ b/tests/test_regenerate.py @@ -662,3 +662,356 @@ def test_regenerate_drops_interjection_when_classifier_returns_false( new_primary_payload = json.loads(cur[0][0]) assert new_primary_payload["text"] == "New primary text." assert "interjection_of" not in new_primary_payload + + +def test_regenerate_with_prior_lifecycle_logs_warning(tmp_path, monkeypatch, caplog): + """T83.4: when the superseded assistant_turn already produced + lifecycle transitions (event_started / event_completed / + event_cancelled), regenerate emits a WARNING naming the un-rolled- + back transitions. Phase 3.5 documents the gap; the actual rollback + is Phase 4 work. + """ + import asyncio + import logging + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.eventlog.log import append_and_apply + from chat.services.regenerate import regenerate_assistant_turn + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + _ut_id, at_id = _seed_with_one_turn(db_path) + + # After the assistant_turn lands, simulate that the turn flow + # produced an event_completed transition. ``append_and_apply`` is + # the standard path so the events projection updates. + with open_db(db_path) as conn: + append_and_apply( + conn, + kind="event_planned", + payload={ + "event_id": "evt_x", + "chat_id": "chat_bot_a", + "kind": "story_event", + "props": {}, + "planned_for": "2026-04-30T18:00:00+00:00", + }, + ) + append_and_apply( + conn, + kind="event_started", + payload={ + "event_id": "evt_x", + "started_at": "2026-04-30T19:00:00+00:00", + }, + ) + completed_id = append_and_apply( + conn, + kind="event_completed", + payload={ + "event_id": "evt_x", + "completed_at": "2026-04-30T19:30:00+00:00", + }, + ) + assert completed_id is not None + + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + mock_client = MockLLMClient( + canned=["Refreshed reply.", state_canned, state_canned] + ) + settings = Settings(featherless_api_key="test") + + caplog.set_level(logging.WARNING, logger="chat.services.regenerate") + + with open_db(db_path) as conn: + asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_bot_a", + original_assistant_event_id=at_id, + ) + ) + + # The warning records the count and at least one of the affected + # event_log ids (event_started + event_completed = at minimum 2). + warnings = [ + r for r in caplog.records if r.levelname == "WARNING" + ] + matching = [w for w in warnings if "lifecycle transition" in w.getMessage()] + assert matching, ( + "expected a WARNING about un-rolled-back lifecycle transitions; " + f"got: {[w.getMessage() for w in warnings]}" + ) + msg = matching[0].getMessage() + # Reference the original superseded turn's id and the event_completed + # row's id. + assert str(at_id) in msg + assert str(completed_id) in msg + + +def test_regenerate_sibling_lookup_scoped_to_chat(tmp_path, monkeypatch): + """T83.3: regenerate's sibling-interjection lookup is scoped to the + chat being regenerated. + + Setup: TWO chats, each with a primary + interjection turn group whose + rows happen to share the same ``user_turn_id`` value (the projector + assigns event_log ids monotonically across the whole database, so + when each chat is seeded back-to-back the chat A primary lands on a + different ``user_turn_id`` than chat B's — but in older versions the + sibling query had no chat predicate, so it could in principle latch + onto a row from a different chat if ids collided in some unusual + flow). We construct the seeding so chat B's interjection has the + SAME ``interjection_of`` value as the chat A primary's speaker_id — + pre-T83.3 the global query could have picked it up. + + Assert: regenerating the chat A primary leaves chat B's rows + untouched (no supersede), and the regenerated chat A turn group's + interjection (the only one regenerate should regenerate) has its + ``regenerated_from`` pointing at the chat A original interjection, + not chat B's. + """ + import asyncio + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.services import regenerate as regenerate_module + from chat.services.interjection import InterjectionDecision + from chat.services.regenerate import regenerate_assistant_turn + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + # Seed chat A's interjection group. + a_ut_id, a_primary_id, a_interjection_id = _seed_with_interjection_group( + db_path + ) + + # Seed chat B with the same shape but a different chat_id and bot + # ids, then add an interjection group whose ``interjection_of`` + # points at "bot_a" so a global (unscoped) query could collide. + with open_db(db_path) as conn: + for bot_id, name in (("bot_c", "BotC"), ("bot_d", "BotD")): + append_event( + conn, + kind="bot_authored", + payload={ + "id": bot_id, + "name": name, + "persona": "", + "voice_samples": [], + "traits": [], + "backstory": "", + "initial_relationship_to_you": "", + "kickoff_prose": "", + }, + ) + append_event( + conn, + kind="chat_created", + payload={ + "id": "chat_other", + "host_bot_id": "bot_c", + "guest_bot_id": "bot_d", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1", + "weather": "", + }, + ) + b_ut_id = append_event( + conn, + kind="user_turn", + payload={ + "chat_id": "chat_other", + "prose": "different chat", + "segments": [], + }, + ) + b_primary_id = append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_other", + "speaker_id": "bot_c", + "text": "Other primary.", + "truncated": False, + "user_turn_id": b_ut_id, + }, + ) + # The chat B interjection's ``interjection_of`` references + # "bot_a" — the chat A primary's speaker. Pre-T83.3 the global + # sibling query could mis-match this row. + b_interjection_id = append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_other", + "speaker_id": "bot_d", + "text": "Cross-chat noise.", + "truncated": False, + "user_turn_id": b_ut_id, + "interjection_of": "bot_a", + }, + ) + + # Stub the interjection classifier to return True so the regenerate + # actively walks the sibling-discovery path. + async def _stub_should_interject(*_args, **_kwargs): + return InterjectionDecision(should_interject=True, reason="fired") + + monkeypatch.setattr( + regenerate_module, "detect_interjection", _stub_should_interject + ) + + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + canned: list[str] = ( + ["New chat A primary."] + + [state_canned] * 6 + + ["New chat A interjection."] + + [state_canned] * 6 + ) + mock_client = MockLLMClient(canned=list(canned)) + settings = Settings(featherless_api_key="test") + + with open_db(db_path) as conn: + new_text = asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_multi", + original_assistant_event_id=a_primary_id, + ) + ) + assert new_text == "New chat A primary." + + # Chat B rows are untouched — neither superseded nor referenced. + b_primary_super = conn.execute( + "SELECT superseded_by FROM event_log WHERE id = ?", + (b_primary_id,), + ).fetchone()[0] + b_interjection_super = conn.execute( + "SELECT superseded_by FROM event_log WHERE id = ?", + (b_interjection_id,), + ).fetchone()[0] + assert b_primary_super is None + assert b_interjection_super is None + + # Chat A's regenerated interjection has its ``regenerated_from`` + # pointing at chat A's original interjection — NOT chat B's. + cur = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'assistant_turn' " + " AND id NOT IN (?, ?, ?, ?) " + " AND superseded_by IS NULL", + (a_primary_id, a_interjection_id, b_primary_id, b_interjection_id), + ).fetchall() + # Two new rows: regenerated primary + regenerated interjection. + assert len(cur) == 2 + payloads = [json.loads(row[0]) for row in cur] + # Find the regenerated interjection (carries interjection_of). + new_interject_payloads = [ + p for p in payloads if p.get("interjection_of") + ] + assert len(new_interject_payloads) == 1 + assert new_interject_payloads[0]["regenerated_from"] == a_interjection_id + # Pin chat scope on every new row. + for p in payloads: + assert p["chat_id"] == "chat_multi" + + +def test_regenerate_registers_task_in_in_flight_tasks(tmp_path, monkeypatch): + """T83.1: regenerate's streaming Task is registered in the chat-keyed + ``_in_flight_tasks`` dict so the /turns/cancel route can cancel a + mid-regenerate stream. Mirrors the meanwhile registration pattern + pinned by tests/test_meanwhile_turn_flow.py. + + Snapshot pattern: a custom MockLLMClient subclass captures the + presence of the chat_id in ``_in_flight_tasks`` at the first stream + yield (when the regenerate coroutine is awaiting our generator and + the task is alive). Post-flight, the entry must be cleaned up so the + next regenerate / turn registers a fresh task. + """ + import asyncio + from typing import AsyncIterator, Sequence + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.llm.client import Message + from chat.services.regenerate import regenerate_assistant_turn + from chat.web.turns import _in_flight_tasks + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + _ut_id, at_id = _seed_with_one_turn(db_path) + + in_flight_snapshot: dict = {} + + class _SnapshotMock(MockLLMClient): + async def stream( + self, messages: Sequence[Message], *, model: str, **params + ) -> AsyncIterator[str]: + text = self._canned.pop(0) + for i, ch in enumerate(text): + if i == 0: + in_flight_snapshot["present"] = ( + "chat_bot_a" in _in_flight_tasks + ) + in_flight_snapshot["task"] = _in_flight_tasks.get( + "chat_bot_a" + ) + yield ch + + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + mock_client = _SnapshotMock( + canned=["Refreshed reply.", state_canned, state_canned] + ) + + settings = Settings(featherless_api_key="test") + + # Pre-condition: registry empty for this chat. + assert "chat_bot_a" not in _in_flight_tasks + + with open_db(db_path) as conn: + new_text = asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_bot_a", + original_assistant_event_id=at_id, + ) + ) + assert new_text == "Refreshed reply." + + # Mid-flight: the streaming task was present in the registry, and + # the captured value was an asyncio.Task. + assert in_flight_snapshot.get("present") is True, ( + "_in_flight_tasks was empty at first yield — regenerate stream " + "isn't registering its task" + ) + assert isinstance(in_flight_snapshot.get("task"), asyncio.Task) + # Post-flight: the entry has been cleaned up. + assert "chat_bot_a" not in _in_flight_tasks diff --git a/tests/test_render.py b/tests/test_render.py index 23c263b..f49e98a 100644 --- a/tests/test_render.py +++ b/tests/test_render.py @@ -85,3 +85,26 @@ def test_render_prose_mixed_full_message(): assert 'looks up' in out # The apostrophe in ``she's`` is HTML-escaped to ``'``. assert '((she's tired))' in out + + +def test_render_turn_html_stamps_event_id_when_provided(): + """T86 follow-up: when ``event_id`` is supplied the wrapper DIV + carries ``id="turn-"`` so the chat-page + ``turn_html_replace`` SSE handler can locate the prior turn DOM + node by id and swap it in-place. Without the id the handler's + ``getElementById('turn-' + supersedes_id)`` lookup misses and + the regenerated turn appends instead of replaces. + """ + out = render_turn_html("BotA", "Hello.", role="bot", event_id=42) + assert 'id="turn-42"' in out + # The id must sit on the wrapper DIV, not somewhere nested inside. + assert out.startswith('
') + + +def test_render_turn_html_omits_id_when_event_id_missing(): + """Legacy callers (no ``event_id`` passed) get a clean DIV with no + id attribute — preserves the pre-T86 fragment shape. + """ + out = render_turn_html("BotA", "Hello.", role="bot") + assert "id=" not in out + assert out.startswith('
') diff --git a/tests/test_skip_narration.py b/tests/test_skip_narration.py index 577ddee..aa69fc6 100644 --- a/tests/test_skip_narration.py +++ b/tests/test_skip_narration.py @@ -98,6 +98,49 @@ class _RaisingMock: yield # pragma: no cover - make this a generator +class _RecordingMock: + """Mock LLMClient that records the kwargs passed to ``generate``. + + Used to assert that callers plumb through optional parameters like + ``timeout_s`` instead of swallowing them. Returns a fixed string so + the surrounding fallback path is not exercised. + """ + + def __init__(self) -> None: + self.captured_kwargs: dict | None = None + + async def generate( + self, messages: Sequence[Message], *, model: str, **params + ) -> str: + self.captured_kwargs = dict(params) + return "ok" + + async def stream( + self, messages: Sequence[Message], *, model: str, **params + ) -> AsyncIterator[str]: + raise RuntimeError("not used") + yield # pragma: no cover - make this a generator + + +@pytest.mark.asyncio +async def test_narrate_skip_passes_timeout_through(): + mock = _RecordingMock() + await narrate_skip( + mock, + narrative_model="x", + skip_kind="jump", + speaker_bot=_SPEAKER, + you_name="Me", + current_time="late evening", + new_time="next morning", + current_activity="winding down for the night", + landing_state_hint="having coffee in the kitchen", + timeout_s=12.5, + ) + assert mock.captured_kwargs is not None + assert mock.captured_kwargs.get("timeout_s") == 12.5 + + @pytest.mark.asyncio async def test_narrate_falls_back_on_generation_failure(): new_time = "next morning" diff --git a/tests/test_streaming_ux.py b/tests/test_streaming_ux.py index 45bf773..1745006 100644 --- a/tests/test_streaming_ux.py +++ b/tests/test_streaming_ux.py @@ -174,3 +174,74 @@ def test_chat_html_includes_stop_streaming_script(client, tmp_path): assert "stop-streaming" in body or "isStreaming" in body # Cancel route reference must be wired so the Stop button can call it. assert "/turns/cancel" in body + + +def test_chat_html_has_turn_html_replace_listener(client, tmp_path): + """T86: the chat shell wires a JS handler for the ``turn_html_replace`` + SSE event so regenerate-driven swaps land in connected tabs without a + page refresh. + + This is a presence / string-check test: it verifies the handler is + embedded in the rendered template but does NOT drive a real browser + (no headless runner is wired into this test environment). The end-to- + end behaviour — receiving the event over SSE and replacing the prior + turn's DOM node — is therefore not exercised here; a manual smoke + check or future browser-driven test would close that gap. + """ + _seed_chat(tmp_path / "test.db") + response = client.get("/chats/chat_bot_a") + assert response.status_code == 200 + body = response.text + # The handler must be wired against the SSE event name the backend + # publishes (chat.services.regenerate -> "turn_html_replace"). + assert "turn_html_replace" in body + # Confirm the handler reads the JSON payload's ``supersedes_id`` so + # it can locate the prior turn node. The exact lookup mechanism may + # vary, but the field name is part of the contract with the backend. + assert "supersedes_id" in body + + +def test_rendered_turn_html_includes_event_id(client, tmp_path): + """T86 follow-up: the chat-detail Jinja loop stamps + ``id="turn-"`` on every rendered turn DIV. Without this id + the ``turn_html_replace`` SSE handler's ``getElementById`` lookup + misses, falls through to ``insertAdjacentHTML('beforeend', …)``, and + the regenerated turn appears APPENDED instead of swapped in-place + (rendering the primary handler path dead code — exactly the gap the + T86 reviewer flagged). + + Seed a user_turn + assistant_turn, GET the chat page, and assert the + response body carries both turns' event ids on the wrapper DIVs. + """ + db_path = tmp_path / "test.db" + _seed_chat(db_path) + with open_db(db_path) as conn: + ut_id = append_event( + conn, + kind="user_turn", + payload={ + "chat_id": "chat_bot_a", + "prose": "hello bot", + "segments": [], + }, + ) + at_id = append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_bot_a", + "speaker_id": "bot_a", + "text": "Hi there.", + "truncated": False, + "user_turn_id": ut_id, + }, + ) + conn.commit() + + response = client.get("/chats/chat_bot_a") + assert response.status_code == 200 + body = response.text + # Both seeded turns must carry ``id="turn-"`` so the SSE + # in-place swap can find them. + assert f'id="turn-{ut_id}"' in body + assert f'id="turn-{at_id}"' in body diff --git a/tests/test_turn_common.py b/tests/test_turn_common.py new file mode 100644 index 0000000..4788fde --- /dev/null +++ b/tests/test_turn_common.py @@ -0,0 +1,221 @@ +"""Shared turn helpers (T83.2). + +``chat.services.turn_common`` extracts two snippets that were duplicated +between ``chat.web.turns`` and ``chat.services.regenerate``: the recent +user-side / assistant_turn read, and the directed-pair edge gather for +the multi-pair state-update pass. These tests pin the helpers' behavior +independently of either call site. +""" + +from __future__ import annotations + +from chat.db.connection import open_db +from chat.db.migrate import apply_migrations +from chat.eventlog.log import append_event +from chat.eventlog.projector import project +from chat.services.turn_common import gather_prior_edges, read_recent_dialogue + + +def _seed_basic_chat(db_path): + """Seed bot + chat + a couple of edges + one round of user/assistant + turns. Returns ``(user_turn_id, assistant_turn_id)``. + """ + apply_migrations(db_path) + with open_db(db_path) as conn: + append_event( + conn, + kind="bot_authored", + payload={ + "id": "bot_a", + "name": "BotA", + "persona": "thoughtful", + "voice_samples": [], + "traits": [], + "backstory": "", + "initial_relationship_to_you": "", + "kickoff_prose": "", + }, + ) + append_event( + conn, + kind="chat_created", + payload={ + "id": "chat_a", + "host_bot_id": "bot_a", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1", + "weather": "", + }, + ) + append_event( + conn, + kind="edge_update", + payload={ + "source_id": "bot_a", + "target_id": "you", + "chat_id": "chat_a", + "affinity_delta": 7, + "trust_delta": 3, + }, + ) + append_event( + conn, + kind="edge_update", + payload={ + "source_id": "you", + "target_id": "bot_a", + "chat_id": "chat_a", + "affinity_delta": 2, + "trust_delta": 1, + }, + ) + ut_id = append_event( + conn, + kind="user_turn", + payload={ + "chat_id": "chat_a", + "prose": "hello", + "segments": [], + }, + ) + at_id = append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_a", + "speaker_id": "bot_a", + "text": "Original.", + "truncated": False, + "user_turn_id": ut_id, + }, + ) + project(conn) + return ut_id, at_id + + +def test_read_recent_dialogue_returns_chronological_pairs(tmp_path): + """``read_recent_dialogue`` returns oldest-first ``{speaker, text}`` + entries scoped to the requested chat. Speaker is "you" for user-side + rows and the assistant_turn's ``speaker_id`` for bot rows. + """ + db = tmp_path / "test.db" + _seed_basic_chat(db) + + with open_db(db) as conn: + out = read_recent_dialogue(conn, "chat_a", limit=10) + + # Each entry now carries the source ``event_log.id`` as ``event_id`` + # (T86 follow-up) so the chat-detail Jinja loop can stamp + # ``id="turn-"`` on each rendered turn DIV — needed by the + # ``turn_html_replace`` SSE handler for in-place regenerate swaps. + speakers = [(e["speaker"], e["text"]) for e in out] + assert speakers == [ + ("you", "hello"), + ("bot_a", "Original."), + ] + assert all("event_id" in e and isinstance(e["event_id"], int) for e in out) + + +def test_read_recent_dialogue_filters_superseded_and_other_chats(tmp_path): + """Superseded rows drop out (regenerate-aware). Rows scoped to a + different chat are also filtered. ``exclude_event_id`` excludes a + specific row even when it isn't superseded yet (regenerate uses this + to drop the original assistant_turn before the supersede UPDATE + lands). + """ + db = tmp_path / "test.db" + ut_id, at_id = _seed_basic_chat(db) + + with open_db(db) as conn: + # Append a second user/assistant pair. + ut_id2 = append_event( + conn, + kind="user_turn", + payload={ + "chat_id": "chat_a", + "prose": "how are you", + "segments": [], + }, + ) + at_id2 = append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_a", + "speaker_id": "bot_a", + "text": "Second.", + "truncated": False, + "user_turn_id": ut_id2, + }, + ) + # And a row scoped to a different chat — must NOT appear. + append_event( + conn, + kind="user_turn", + payload={ + "chat_id": "other_chat", + "prose": "should be filtered", + "segments": [], + }, + ) + # Mark the first assistant_turn as superseded — must drop out. + conn.execute( + "UPDATE event_log SET superseded_by = ? WHERE id = ?", + (at_id2, at_id), + ) + + out = read_recent_dialogue(conn, "chat_a", limit=10) + # First (superseded) assistant turn dropped; "other_chat" rows + # filtered; first user_turn still present. + speakers = [(e["speaker"], e["text"]) for e in out] + assert speakers == [ + ("you", "hello"), + ("you", "how are you"), + ("bot_a", "Second."), + ] + + # exclude_event_id drops at_id2 even though it's not superseded. + out2 = read_recent_dialogue( + conn, "chat_a", limit=10, exclude_event_id=at_id2 + ) + speakers2 = [(e["speaker"], e["text"]) for e in out2] + assert ("bot_a", "Second.") not in speakers2 + assert ("you", "how are you") in speakers2 + + # Ensure ut_id is still part of the dataset (sanity for the seed). + assert ut_id is not None + + +def test_gather_prior_edges_fills_missing_with_default(tmp_path): + """``gather_prior_edges`` returns one entry per directed pair across + ``present_ids``. Missing rows fall back to the schema default + 50/50 baseline; existing rows carry their stored values. + """ + db = tmp_path / "test.db" + _seed_basic_chat(db) + + with open_db(db) as conn: + out = gather_prior_edges(conn, ["bot_a", "you"]) + + # 2 entities -> 2 directed pairs (a->b and b->a, no self-pairs). + assert set(out.keys()) == {("bot_a", "you"), ("you", "bot_a")} + bot_to_you = out[("bot_a", "you")] + you_to_bot = out[("you", "bot_a")] + # Both edges seeded with deltas — they must reflect the projected + # affinity/trust (not the default 50/50). + assert bot_to_you["affinity"] == 57 # 50 + 7 + assert bot_to_you["trust"] == 53 # 50 + 3 + assert you_to_bot["affinity"] == 52 + assert you_to_bot["trust"] == 51 + + # A pair with no row yet falls back to 50/50. + with open_db(db) as conn: + out_with_missing = gather_prior_edges( + conn, ["bot_a", "you", "ghost_bot"] + ) + # 3 entities -> 6 directed pairs. + assert len(out_with_missing) == 6 + fallback = out_with_missing[("bot_a", "ghost_bot")] + assert fallback["affinity"] == 50 + assert fallback["trust"] == 50 + assert fallback["summary"] == "" diff --git a/tests/test_turn_flow.py b/tests/test_turn_flow.py index 80364ec..043fa78 100644 --- a/tests/test_turn_flow.py +++ b/tests/test_turn_flow.py @@ -1317,3 +1317,247 @@ def test_skip_command_does_not_run_narrative_classifier( "assemble_narrative_prompt was called on the skip path; the " "natural-language skip dispatch must bypass narrative assembly." ) + + +# --------------------------------------------------------------------------- +# Phase 3.5 (T82.1) — post_turn consumes pending meanwhile digests. +# +# The helper ``consume_pending_meanwhile_digests`` lives in +# chat.services.prompt and is now wired into the END of post_turn (after +# scene-close detection, before the response broadcast). This pins the +# wiring so future refactors don't accidentally drop the call and leave +# digests pending forever. +# --------------------------------------------------------------------------- + + +def test_post_turn_consumes_pending_meanwhile_digests( + app_state_setup, tmp_path +): + """Seed a pending meanwhile digest via ``meanwhile_digest_created``, + POST a regular you-turn through post_turn, and assert: + + 1. A ``meanwhile_digest_consumed`` event lands in the event_log. + 2. ``list_pending_meanwhile_digests`` returns empty after the turn. + + The post_turn flow surfaces the digest in the prompt (T65) and then + consumes it (T82.1) so the next turn starts clean. + """ + _seed(tmp_path / "test.db") + + db_path = tmp_path / "test.db" + # Seed a pending digest directly via the projection event. The scene_id + # field doesn't need to reference an existing meanwhile scene for the + # digest table — the FK is on the digest payload only. + with open_db(db_path) as conn: + append_and_apply( + conn, + kind="meanwhile_digest_created", + payload={ + "scene_id": 99, + "chat_id": "chat_bot_a", + "summary": "While you were away, the bots talked.", + }, + ) + # Confirm the digest is pending before the turn lands. + from chat.state.meanwhile import list_pending_meanwhile_digests + + assert len(list_pending_meanwhile_digests(conn, "chat_bot_a")) == 1 + + canned_parse = json.dumps( + {"segments": [{"kind": "dialogue", "text": "hello"}]} + ) + # Standard 4-slot queue: parse + narrative + 2 state-updates. No + # active scene so scene-close detection short-circuits without an LLM + # call (consistent with the no-guest regression test). + mock = _override_llm( + [canned_parse, "Hi there.", _zero_state(), _zero_state()] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", data={"prose": "hello"} + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + # All canned slots drained — no extra classifier calls fired. + assert mock._canned == [] + + with open_db(db_path) as conn: + # A meanwhile_digest_consumed event landed for the seeded digest. + consumed_rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'meanwhile_digest_consumed' ORDER BY id" + ).fetchall() + assert len(consumed_rows) == 1 + + # The pending list is empty after consumption. + from chat.state.meanwhile import list_pending_meanwhile_digests + + assert list_pending_meanwhile_digests(conn, "chat_bot_a") == [] + + +# --------------------------------------------------------------------------- +# Phase 3.5 (T82.2) — natural-language skip runs scene close detection. +# +# A user typing "fade out, skip an hour" should close the scene FIRST +# (so the close summary captures the closing scene's final beat) and +# THEN run the elision skip. Without this wiring, the skip dispatch +# branch bypasses scene close entirely. +# --------------------------------------------------------------------------- + + +def test_natural_language_skip_with_close_signal_closes_scene( + app_state_setup, tmp_path +): + """Prose that hard-signals a close ("fade out, skip to morning") and + parses as ``intent=skip_elision`` must: + + 1. Land a ``scene_closed`` event before any skip event. + 2. Run ``apply_scene_close_summary`` for the closing scene. + 3. Land a ``time_skip_elision`` event AFTER the scene_closed. + + Order matters — the scene_closed id must be lower than the + time_skip_elision id in the event_log. + + Canned queue (single-bot, scene seeded, NO prior dialogue rows): + 1. parse_turn -> intent=skip_elision + 2. detect_scene_close -> should_close=True + 3. apply_scene_close_summary host POV + 4. narrate_skip narration + + detect_threads (T58.2 fires on every close) short-circuits when the + scene-scoped transcript is empty — in this test no user/assistant + turns landed in scene 1 before the close, so no thread-detection + slot is needed. + """ + # Seed an open scene so detect_scene_close has something to act on. + db_path = tmp_path / "test.db" + with open_db(db_path) as conn: + append_event( + conn, + kind="bot_authored", + payload={ + "id": "bot_a", + "name": "BotA", + "persona": "thoughtful, observant", + "voice_samples": [], + "traits": [], + "backstory": "", + "initial_relationship_to_you": "", + "kickoff_prose": "...", + }, + ) + append_event( + conn, + kind="chat_created", + payload={ + "id": "chat_bot_a", + "host_bot_id": "bot_a", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1", + "weather": "", + }, + ) + append_event( + conn, + kind="container_created", + payload={ + "chat_id": "chat_bot_a", + "name": "office", + "type": "workplace", + "properties": {}, + }, + ) + append_event( + conn, + kind="scene_opened", + payload={ + "chat_id": "chat_bot_a", + "container_id": 1, + "started_at": "2026-04-26T20:00:00+00:00", + "participants": ["you", "bot_a"], + }, + ) + append_event( + conn, + kind="edge_update", + payload={ + "source_id": "bot_a", + "target_id": "you", + "chat_id": "chat_bot_a", + "knowledge_facts": ["coworker"], + }, + ) + for entity_id, verb in [("you", "talking"), ("bot_a", "listening")]: + append_event( + conn, + kind="activity_change", + payload={ + "entity_id": entity_id, + "posture": "sitting", + "action": { + "verb": verb, + "interruptible": True, + "required_attention": "low", + "expected_duration": "ongoing", + }, + "attention": "", + "holding": [], + "status": {}, + }, + ) + project(conn) + + canned_parse = json.dumps( + { + "segments": [ + {"kind": "narration", "text": "fade out, skip to morning"} + ], + "intent": "skip_elision", + "landing_state_hint": "morning at home", + } + ) + canned_close = json.dumps( + {"should_close": True, "reason": "fade out signaled"} + ) + canned_pov = json.dumps( + { + "summary": "BotA noticed the day winding down.", + "knowledge_facts": [], + "relationship_summary": "warmer", + } + ) + canned_narration = "The night fades and morning arrives." + mock = _override_llm( + [ + canned_parse, + canned_close, + canned_pov, + canned_narration, + ] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", + data={"prose": "fade out, skip to morning"}, + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + # All 4 canned slots drained — close + skip both ran end-to-end. + assert mock._canned == [] + + with open_db(db_path) as conn: + # scene_closed and time_skip_elision both landed. + scene_close_rows = conn.execute( + "SELECT id FROM event_log WHERE kind = 'scene_closed'" + ).fetchall() + skip_rows = conn.execute( + "SELECT id FROM event_log WHERE kind = 'time_skip_elision'" + ).fetchall() + assert len(scene_close_rows) == 1, "scene_closed must land" + assert len(skip_rows) == 1, "time_skip_elision must land" + # Order: scene close first, then skip. + assert scene_close_rows[0][0] < skip_rows[0][0], ( + "scene_closed must precede time_skip_elision in the event_log" + )