Phase 3.5 cleanup: 17-item backlog burndown #5

Merged
dohertj2 merged 33 commits from phase-3.5 into main 2026-04-27 01:56:29 -04:00
29 changed files with 2592 additions and 299 deletions
+34 -44
View File
@@ -204,15 +204,7 @@ Phase 2.5 cleanup shipped end-to-end across 8 tasks (T68T75). Two CLAUDE.md b
### Phase 2.6 / 3 backlog
New follow-ups discovered during Phase 2.5 execution. None are blocking; pick up at any time.
- **Frontend handler for `turn_html_replace` SSE event (from T73.1 review)**: regenerate's backend broadcast lands, but no live tab swaps the regenerated turn until a JS handler is wired. The existing `turn_html` event uses HTMX `sse-swap` to append; `turn_html_replace` ships JSON with `supersedes_id` for replacement semantics. Phase 2.6 should wire the JS to swap the prior turn's DOM node in place.
- **Cancel/stop hook for in-flight regenerate streams (from T73 review)**: `post_turn` registers stream tasks in `_in_flight_tasks` so the user can stop them. Regenerate doesn't. A user clicking "Stop" mid-regenerate has no cancel hook today.
- **DRY: regenerate vs post_turn (from T73 review)**: recent-dialogue assembly and prior-edges block are duplicated between `chat/services/regenerate.py` and `chat/web/turns.py`. Extract to shared helpers analogous to `_gather_state_update_inputs`.
- **Sibling-discovery query optimization (from T73 review)**: `regenerate.py`'s sibling-assistant-turn lookup scans all non-superseded `assistant_turn` rows globally. Adding a `chat_id` predicate via JSON extraction (or a denormalized column) bounds the cost to per-chat scale.
- **`_witness_role_for` defensive coding (from T71 review)**: helper returns `"guest"` when `host_bot_id is None`, which is wrong for Phase-1 chats. Defensive: `return "host" if host_bot_id is None or speaker_bot_id == host_bot_id else "guest"`. Not exercised by current tests; harden as a precaution.
- **Confidence type tightening (from T74 review)**: `chat/services/addressee.py::AddresseeDecision.confidence` could be typed as `Literal["high","medium","low"]` for stricter validation. Currently `str` with a comment.
- **Scene-close-on-cancel UX revisit**: T74.3 pinned the existing behavior (close fires even on cancel). If real play-testing surfaces a regression, revisit.
All items shipped — see Phase 3.5 status below.
## Phase 3 status
@@ -249,51 +241,49 @@ Phase 3 shipped end-to-end across 19 tasks (T49T67). Events with full lifecyc
### Phase 3.5 / 4 backlog
New follow-ups discovered during Phase 3 reviews and execution. None are blocking; pick up at any time.
All items shipped — see Phase 3.5 status below.
#### From T53 review
## Phase 3.5 status
- **`narrate_skip` `timeout_s` not piped through to `client.generate`**: parameter accepted but ignored. Fix: pass `timeout_s=timeout_s` to `client.generate(**...)`, or drop the parameter entirely if Featherless's client doesn't honor it.
Phase 3.5 cleanup shipped end-to-end across 12 tasks (T76T87). Two CLAUDE.md backlogs (Phase 2.6/3, Phase 3.5/4) are now empty; deferred follow-ups discovered during execution are tracked in a new "Phase 3.6 / 4 backlog" section below. Test count grew from 315 (Phase 3) to 343 (+28 new tests).
#### From T57 review
- **Wave 1 — trivial polish (parallel)**:
- **T76** `narrate_skip` `timeout_s` plumbed through to `client.generate`.
- **T77** `AddresseeDecision.confidence` typed as `Literal["high","medium","low"]`.
- **T78** `search_memories` docstring notes SQL-side significance bias (`SIGNIFICANCE_RANK_BIAS`).
- **T79** `_witness_role_for` defensive `host_bot_id is None` handling (returns `"host"` for Phase-1 chats).
- **Wave 2 — scene_summarize polish (single)**:
- **T80** five T58 follow-ups: re-close suffix bloat guard, transcript scoping by scene, swallowed-exception logging in `detect_threads`, chat-clock `closed_at`, and three new tests covering T58 gaps (200-char truncation, `thread_updated`/`thread_closed` candidate paths, try/except fallback).
- **Wave 3 — typed exception (single)**:
- **T81** `ChatNotFoundError` replaces string-prefix sniff in skip routes; mapped to 404 (vs 400 for other `ValueError` cases).
- **Wave 4 — turn-flow wiring (single)**:
- **T82** `consume_pending_meanwhile_digests` wired into `post_turn` (closes T66 gap; meanwhile digests no longer pile up); natural-language skip dispatch now runs scene close detection first.
- **Wave 5 — regenerate polish (single)**:
- **T83** five sub-fixes — cancel/stop hook (regenerate registers stream task in `_in_flight_tasks`); DRY extraction of `read_recent_dialogue` and `gather_prior_edges` into `chat/services/turn_common.py`; chat-scoped sibling-assistant-turn lookup; lifecycle-rollback warning log on regenerate; ordering-symmetry comment between post_turn and regenerate event-detection paths.
- **Wave 6 — final polish (parallel)**:
- **T84** unified `record_turn_memory` API with `you_present` kwarg; `record_meanwhile_memory` becomes a thin wrapper.
- **T85** JSON-build audit (no findings) + meanwhile cancel route-level test.
- **T86** frontend `turn_html_replace` SSE handler + turn_id stamping on rendered HTML so the in-place swap actually works.
- **`search_memories` docstring should mention SQL-side significance bias**: the function docstring still describes only the Python composite re-rank; add a one-line note about `SIGNIFICANCE_RANK_BIAS`.
### Phase 3.6 / 4 backlog
#### From T58 review
New follow-ups discovered during Phase 3.5 reviews and execution. None are blocking; pick up at any time.
- **Scene close re-close suffix bloat risk**: `_build_key_quotes_suffix` reads from `memories.pov_summary`. If a scene close runs twice, the second pass would read the rewritten text plus the previous "Key quotes:" suffix and append a second one. Either guard for double-suffix or source quotes from `event_log` `assistant_turn`/`user_turn` text instead.
- **Thread detection transcript scoping**: `_read_recent_dialogue` returns chat-wide history with no `scene_id` filter (Phase 1 turns lack one). Feeding chat-wide history to `detect_threads` will misattribute threads to the closing scene when the scene boundary falls inside the last 50 turns. Scope by `scene_id` once turns carry it, or by `started_at` against scene-open timestamp.
- **Swallowed exceptions in `detect_threads` try/except**: bare `Exception` swallows programmer errors silently. Log at debug level so silent regressions are recoverable.
- **Scene close `closed_at` clock divergence**: T58 uses `datetime.now(timezone.utc).isoformat()` instead of chat-clock time. Diverges from chat-clock semantics elsewhere; revisit if event reconstructions need chat-clock ordering.
- **Test coverage gaps in T58**: no test for 200-char quote truncation; no test for `thread_updated`/`thread_closed` candidate paths; no test for the `try/except` fallback.
#### From T80 review
#### From T61 review
- **`read_recent_dialogue` chat-id pushdown**: helper filters `chat_id` post-fetch in Python. Could push the `json_extract(payload_json, '$.chat_id') = ?` predicate into SQL (matching T83.3's pattern) for tighter LIMIT semantics. Currently a chat-with-many-other-chats can have its 50-row LIMIT consumed by foreign rows.
- **Lifecycle warning wording in regenerate**: T83.4's warning log lists ALL lifecycle event ids that exist after the original `assistant_turn` id, not just ones produced by the superseded turn. For the typical "regenerate the most recent" flow these are identical, but if a user regenerates an OLDER turn, the warning will list intervening-turn lifecycle events that legitimately stand. Tighten warning wording to "lifecycle transitions at-or-after turn X" (operator-friendly); a code-level fix would require a schema change to add explicit back-reference from lifecycle events to their producing turn.
- **Regenerate doesn't roll back lifecycle transitions from superseded turn**: `event_started`/`event_completed` rows from a superseded turn remain. Phase 3.5 should add a lifecycle-undo step. Caveat: regenerate-after-completion may double-emit promotion artifacts if the new text re-completes the same event.
- **Asymmetry in event-detection ordering**: post_turn runs lifecycle BETWEEN interjection and scene-close; regenerate runs lifecycle at the END. Benign because regenerate has no scene-close path, but worth tidying.
#### From T84 review
#### From T62 review
- **`record_turn_memory` legacy single-bot function** still exists alongside the unified `record_turn_memory_for_present`. Could be consolidated in a follow-up.
- **Error-message prefix sniff for 404 vs 400 routing**: drawer skip routes use `str(exc).startswith("chat not found")` to distinguish 404 from 400. Fragile if error wording changes. Use a typed exception subclass.
- **Skip command bypasses scene close detection**: a user typing "fade out, skip an hour" would skip without closing the scene. Acceptable for Phase 3 but worth noting.
#### From T86 fix-up
#### From T63 review
- **Test fixtures + `tests/test_phase3_integration.py`** that seed turns directly via `append_event`+`project` may need updating once any new test asserts the rendered HTML carries the new turn ids end-to-end. Existing tests pass because they don't read the stamped attribute, but they're brittle if the contract evolves.
- **`participants_json` JSON injection** (FIXED in T63 but worth noting in backlog as a "double-check other JSON-string-build sites" task): T63 originally used f-string interpolation; fixed to use `json.dumps`. Audit other state modules for similar patterns.
#### Deferred items (carry-over)
#### From T64 review
- **`record_meanwhile_memory` and `record_turn_memory_for_present` share private `_write_one_memory` helper**: minor DRY note; both helpers are similar enough that a unified API with a `you_present: bool` kwarg might be cleaner long-term.
- **Stop button cancellation for meanwhile turns**: T64 fix-up registered tasks in `_in_flight_tasks`; verify the `/turns/cancel` endpoint actually cancels meanwhile streams (the test pins registration but not the cancel-from-route path).
#### From cross-feature interactions discovered in Wave 6b merge
- **Cross-feature canned-queue brittleness**: meanwhile-scene close test required a canned response for T65's digest call after T64+T65 merge. Future close-path additions will keep extending the queue; consider a structured fixture builder rather than positional canned arrays.
#### From T66 integration tests
- **`consume_pending_meanwhile_digests` is defined but NOT wired into `post_turn`**: the helper lives in `chat/services/prompt.py` (T65) but `chat/web/turns.py` never calls it. Meanwhile digests stay pending forever in production. Phase 3.5 should call the helper after the first you-turn following a meanwhile close — probably right after the assistant_turn lands but before the next prompt assembly. Pinned by `tests/test_phase3_integration.py::test_meanwhile_close_digest_surfaces_then_consumed` which currently calls the helper directly.
#### Discovered during Phase 3 execution
- **`_witness_role_for` defensive `host_bot_id is None`** (carry-over from Phase 2.5 T71 backlog) — still pending.
- **Scene-close-on-cancel UX revisit** (Phase 2.5 carry-over): T74.3 pinned the existing behavior; revisit if real play-testing surfaces a regression.
- **Cross-feature canned-queue brittleness**: meanwhile-scene close test required a canned response for T65's digest call after T64+T65 merge. Future close-path additions will keep extending the queue. Consider a structured fixture builder rather than positional canned arrays. NOT addressed in Phase 3.5.
- **Lifecycle-transition rollback in regenerate**: T83.4 added a warning log; actual rollback (with proper schema linkage from lifecycle event back to producing turn) is Phase 4 work.
+3 -1
View File
@@ -22,6 +22,8 @@ from a fallback.
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel
from chat.llm.classify import classify
@@ -39,7 +41,7 @@ class AddresseeDecision(BaseModel):
"""
addressee_id: str
confidence: str = "medium" # "high" | "medium" | "low"
confidence: Literal["high", "medium", "low"] = "medium"
reason: str = ""
+34 -41
View File
@@ -134,17 +134,34 @@ def record_turn_memory_for_present(
chat_clock_at: str | None = None,
source: str = "direct",
significance: int = 1,
you_present: bool = True,
) -> dict[str, tuple[int, int | None]]:
"""Write a ``memory_written`` event for each present bot witness.
"""Single entry-point for per-turn memory writes (T84).
Host is always written. Guest is written iff ``guest_bot_id is not
None``. Witness flags are ``[you=1, host=1, guest=1]`` when a guest
is present, ``[you=1, host=1, guest=0]`` otherwise.
Writes one ``memory_written`` event per present bot witness. Host is
always written. Guest is written iff ``guest_bot_id is not None``.
Witness flags depend on ``you_present``:
- ``you_present=True`` (default — Phase 1/2/3 you-scenes): the user
is a witness. Mask is ``[you=1, host=1, guest=1]`` when a guest is
present, ``[you=1, host=1, guest=0]`` otherwise.
- ``you_present=False`` (Phase 3 meanwhile scenes): the user is
absent. Mask is ``[you=0, host=1, guest=1]`` for both bots. Both
``host_bot_id`` and ``guest_bot_id`` are required — a meanwhile
scene by definition has both bots, so passing ``guest_bot_id=None``
with ``you_present=False`` is a programming error and raises
:class:`ValueError`.
Returns a mapping ``{bot_id: (event_id, memory_id)}`` so callers can
look up the freshly-projected memory id per owner without re-querying
the database.
"""
if not you_present and guest_bot_id is None:
raise ValueError("you_present=False requires guest_bot_id")
witness_you = 1 if you_present else 0
witness_host = 1
witness_guest = 1 if guest_bot_id is not None else 0
result: dict[str, tuple[int, int | None]] = {}
@@ -153,8 +170,8 @@ def record_turn_memory_for_present(
owner_id=host_bot_id,
chat_id=chat_id,
narrative_text=narrative_text,
witness_you=1,
witness_host=1,
witness_you=witness_you,
witness_host=witness_host,
witness_guest=witness_guest,
scene_id=scene_id,
chat_clock_at=chat_clock_at,
@@ -167,8 +184,8 @@ def record_turn_memory_for_present(
owner_id=guest_bot_id,
chat_id=chat_id,
narrative_text=narrative_text,
witness_you=1,
witness_host=1,
witness_you=witness_you,
witness_host=witness_host,
witness_guest=1,
scene_id=scene_id,
chat_clock_at=chat_clock_at,
@@ -190,46 +207,22 @@ def record_meanwhile_memory(
source: str = "direct",
significance: int = 1,
) -> dict[str, tuple[int, int | None]]:
"""Write per-POV ``memory_written`` events for a meanwhile turn (T64).
"""Backward-compat thin wrapper for meanwhile memory writes (T64, T84).
A meanwhile scene runs entirely between host + guest, with "you"
absent. Both bots are present witnesses, so each one gets a row with
witness flags ``[you=0, host=1, guest=1]`` — different from the
normal-turn ``record_turn_memory_for_present`` shape, which assumes
the user is always a witness (``witness_you=1``).
The ``guest_bot_id`` is required (a meanwhile scene by definition
has both bots) — callers passing ``None`` is a programming error.
Returns ``{bot_id: (event_id, memory_id)}`` mirroring
:func:`record_turn_memory_for_present` so downstream queues
(significance scoring) can pull memory ids without re-querying.
Equivalent to calling :func:`record_turn_memory_for_present` with
``you_present=False``. Kept so existing call sites in
:mod:`chat.web.meanwhile` continue to work without churn. New code
should prefer the unified entry-point directly.
"""
result: dict[str, tuple[int, int | None]] = {}
result[host_bot_id] = _write_one_memory(
return record_turn_memory_for_present(
conn,
owner_id=host_bot_id,
chat_id=chat_id,
host_bot_id=host_bot_id,
guest_bot_id=guest_bot_id,
narrative_text=narrative_text,
witness_you=0,
witness_host=1,
witness_guest=1,
scene_id=scene_id,
chat_clock_at=chat_clock_at,
source=source,
significance=significance,
you_present=False,
)
result[guest_bot_id] = _write_one_memory(
conn,
owner_id=guest_bot_id,
chat_id=chat_id,
narrative_text=narrative_text,
witness_you=0,
witness_host=1,
witness_guest=1,
scene_id=scene_id,
chat_clock_at=chat_clock_at,
source=source,
significance=significance,
)
return result
+8 -1
View File
@@ -379,8 +379,15 @@ def _witness_role_for(speaker_bot_id: str, host_bot_id: str | None) -> str:
pinned the contract on ``search_memories``; this helper applies it
at the call site so a guest-as-speaker doesn't silently retrieve
memories under the wrong POV mask.
When ``host_bot_id`` is ``None`` (degenerate case from a half-seeded
chat or Phase-1 path), the speaker is treated as the host so the
query falls back to the host POV mask rather than silently masking
the speaker's own memories as a guest.
"""
return "host" if speaker_bot_id == host_bot_id else "guest"
if host_bot_id is None or speaker_bot_id == host_bot_id:
return "host"
return "guest"
def _resolve_addressee(
+189 -112
View File
@@ -68,7 +68,9 @@ Phase 2.5 changes:
from __future__ import annotations
import asyncio
import json
import logging
from sqlite3 import Connection
from chat.config import Settings
@@ -79,6 +81,10 @@ from chat.services.interjection import detect_interjection
from chat.services.memory_write import record_turn_memory_for_present
from chat.services.multi_state_update import compute_state_updates_for_present
from chat.services.prompt import assemble_narrative_prompt
from chat.services.turn_common import (
gather_prior_edges,
read_recent_dialogue,
)
from chat.state.edges import get_edge
from chat.state.entities import get_bot, get_you
from chat.state.events import list_active_events
@@ -86,6 +92,8 @@ from chat.state.world import active_scene, get_chat
from chat.web.pubsub import publish
from chat.web.render import render_turn_html
_log = logging.getLogger(__name__)
async def regenerate_assistant_turn(
conn: Connection,
@@ -104,6 +112,19 @@ async def regenerate_assistant_turn(
Raises :class:`ValueError` when the chat or the assistant_turn event
cannot be found — the FastAPI route translates this to 404.
.. note::
**Lifecycle-rollback limitation (T83.4, Phase 4 follow-up).**
When the superseded turn already produced lifecycle transitions
(``event_started`` / ``event_completed`` / ``event_cancelled``),
this function does NOT roll those rows back before re-running
``detect_event_transitions`` against the regenerated text. A
regenerate-after-completion can therefore double-emit promotion
artifacts if the new text re-completes the same event. Phase 3.5
only documents the gap and emits a WARNING log naming the
affected event_log ids; the actual undo pass is invasive
(re-projection / inverse-handler dispatch) and is deferred to
Phase 4. See the ``# T83.4`` block below for the warning emit.
"""
chat = get_chat(conn, chat_id)
if chat is None:
@@ -136,6 +157,40 @@ async def regenerate_assistant_turn(
original_assistant_payload = json.loads(row[0])
original_user_turn_id = original_assistant_payload.get("user_turn_id")
# T83.4: scan for downstream lifecycle transitions emitted by the
# superseded turn — they're not being rolled back (see method
# docstring). Heuristic: any ``event_started`` / ``event_completed``
# / ``event_cancelled`` event_log row with id strictly greater than
# the original assistant_turn's id was emitted as part of (or after)
# that turn's processing. Lifecycle events don't carry ``chat_id``
# in their payload (their payload references an ``event_id`` FK to
# the ``events`` table, which holds chat_id), so we join through
# ``events`` to scope to this chat.
#
# A WARNING log surfaces the affected event ids so operators can
# spot double-emit cases until the Phase 4 rollback pass lands.
unrolled_lifecycle = conn.execute(
"SELECT el.id, el.kind FROM event_log AS el "
"JOIN events AS ev "
" ON ev.event_id = json_extract(el.payload_json, '$.event_id') "
"WHERE el.kind IN ("
" 'event_started', 'event_completed', 'event_cancelled'"
" ) "
" AND ev.chat_id = ? "
" AND el.id > ? "
"ORDER BY el.id ASC",
(chat_id, original_assistant_event_id),
).fetchall()
if unrolled_lifecycle:
_log.warning(
"regenerate_assistant_turn: %d lifecycle transition(s) from "
"superseded turn %s are NOT being rolled back (Phase 4 "
"follow-up). Affected event ids: %s",
len(unrolled_lifecycle),
original_assistant_event_id,
[r[0] for r in unrolled_lifecycle],
)
# 1a. Look up any sibling interjection beat in the same turn group
# (T73.2). The original group is (primary + optional interjection),
# both pinned to the same ``user_turn_id``. The interjection has a
@@ -143,6 +198,13 @@ async def regenerate_assistant_turn(
# the silent witness (the bot that wasn't the primary addressee).
# Filter on ``superseded_by IS NULL`` so prior regenerates of this
# group don't reappear as siblings.
#
# T83.3: push the chat_id filter into SQL via ``json_extract`` so
# the query doesn't scan every assistant_turn row across the whole
# database. ``LIMIT 50`` bounds worst-case work even when chat_id
# isn't selective (e.g. a single chat with many turns) — we only
# need the one matching sibling. Mirrors the SQL pattern in
# ``chat.web.meanwhile._last_meanwhile_speaker``.
original_interjection_event_id: int | None = None
original_interjection_payload: dict | None = None
if original_user_turn_id is not None:
@@ -150,8 +212,11 @@ async def regenerate_assistant_turn(
"SELECT id, payload_json FROM event_log "
"WHERE kind = 'assistant_turn' "
" AND id != ? "
" AND superseded_by IS NULL",
(original_assistant_event_id,),
" AND superseded_by IS NULL "
" AND json_extract(payload_json, '$.chat_id') = ? "
"ORDER BY id DESC "
"LIMIT 50",
(original_assistant_event_id, chat_id),
)
for sib_id, sib_payload_json in sibling_cur.fetchall():
sib_payload = json.loads(sib_payload_json)
@@ -208,33 +273,30 @@ async def regenerate_assistant_turn(
# assistant_turn explicitly (we haven't superseded it yet — that
# update lands at the end so the new event_id is known) and use the
# standard ``superseded_by IS NULL AND hidden = 0`` filter so any
# prior regenerates also drop out.
# prior regenerates also drop out. T83.2: shared helper handles the
# SQL + filtering; we post-process to map speaker ids to display
# names for the prompt.
you_entity = get_you(conn) or {"name": "you", "persona": ""}
you_name = you_entity.get("name", "you")
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND id != ? "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT 20",
(original_assistant_event_id,),
raw_recent = read_recent_dialogue(
conn,
chat_id,
limit=20,
exclude_event_id=original_assistant_event_id,
)
rows = list(reversed(cur.fetchall()))
recent: list[dict] = []
for _eid, kind, payload_json in rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
for entry in raw_recent:
spk = entry.get("speaker", "bot")
if spk == "you":
recent.append({"speaker": you_name, "text": entry.get("text", "")})
continue
if kind in ("user_turn", "user_turn_edit"):
recent.append({"speaker": you_name, "text": p.get("prose", "")})
else:
spk = p.get("speaker_id", "bot")
if spk == host_bot_id:
spk_name = host_bot.get("name", "bot")
if spk == host_bot_id:
spk_name = host_bot.get("name", "bot")
elif guest_bot is not None and spk == guest_bot.get("id"):
spk_name = guest_bot.get("name", "bot")
recent.append({"speaker": spk_name, "text": p.get("text", "")})
elif guest_bot is not None and spk == guest_bot.get("id"):
spk_name = guest_bot.get("name", "bot")
else:
spk_name = host_bot.get("name", "bot")
recent.append({"speaker": spk_name, "text": entry.get("text", "")})
# 4. Assemble the narrative prompt. ``recent`` already excludes the
# current user prose, which we pass through ``user_turn_prose``.
@@ -250,19 +312,37 @@ async def regenerate_assistant_turn(
guest_id=guest_bot_id,
)
# 5. Stream the new narrative.
# 5. Stream the new narrative. T83.1: register the streaming Task in
# the chat-keyed in-flight registry so POST /chats/<id>/turns/cancel
# can call ``.cancel()`` on a mid-regenerate stream. We import the
# underscore name from turns.py deliberately — same single-process
# registry the cancel route reads, mirrors the meanwhile registration
# pattern in chat/web/meanwhile.py.
from chat.web.turns import _in_flight_tasks # noqa: PLC0415
accumulated: list[str] = []
async for chunk in client.stream(
messages,
model=settings.narrative_model,
max_tokens=settings.narrative_max_tokens,
temperature=settings.narrative_temperature,
):
accumulated.append(chunk)
await publish(
chat_id,
{"event": "token", "text": chunk, "speaker_id": speaker_bot_id},
)
async def _stream_primary() -> None:
async for chunk in client.stream(
messages,
model=settings.narrative_model,
max_tokens=settings.narrative_max_tokens,
temperature=settings.narrative_temperature,
):
accumulated.append(chunk)
await publish(
chat_id,
{"event": "token", "text": chunk, "speaker_id": speaker_bot_id},
)
stream_task = asyncio.create_task(_stream_primary())
_in_flight_tasks[chat_id] = stream_task
try:
await stream_task
finally:
# Always unregister so a subsequent turn / regenerate can register
# a fresh task. Mirrors the cleanup in turns.py::post_turn.
_in_flight_tasks.pop(chat_id, None)
new_text = "".join(accumulated)
# 6. Append the new assistant_turn event. ``user_turn_id`` points at
@@ -301,7 +381,10 @@ async def regenerate_assistant_turn(
speaker_bot.get("name", "bot") if speaker_bot is not None else "bot"
)
new_turn_html = render_turn_html(
speaker_name_for_render, new_text, role="bot"
speaker_name_for_render,
new_text,
role="bot",
event_id=new_assistant_event_id,
)
await publish(
chat_id,
@@ -354,17 +437,8 @@ async def regenerate_assistant_turn(
present_names[guest_bot_id] = guest_bot.get("name", "bot")
personas[guest_bot_id] = guest_bot.get("persona") or ""
prior_edges: dict[tuple[str, str], dict] = {}
for src in present_ids:
for tgt in present_ids:
if src == tgt:
continue
edge = get_edge(conn, src, tgt) or {
"affinity": 50,
"trust": 50,
"summary": "",
}
prior_edges[(src, tgt)] = edge
# T83.2: shared helper builds the directed-pair edge dict.
prior_edges = gather_prior_edges(conn, present_ids)
state_updates = await compute_state_updates_for_present(
client,
@@ -453,34 +527,27 @@ async def regenerate_assistant_turn(
)
if decision.should_interject:
# Re-read recent so the just-appended primary is in the prompt.
interject_cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT 20",
)
interject_rows = list(reversed(interject_cur.fetchall()))
# Re-read recent so the just-appended primary is in the
# prompt. T83.2: shared helper + the same id->name mapping
# as the primary read above.
raw_interject = read_recent_dialogue(conn, chat_id, limit=20)
interject_recent: list[dict] = []
for _eid, kind, payload_json in interject_rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
for entry in raw_interject:
spk = entry.get("speaker", "bot")
if spk == "you":
interject_recent.append(
{"speaker": you_name, "text": entry.get("text", "")}
)
continue
if kind in ("user_turn", "user_turn_edit"):
interject_recent.append(
{"speaker": you_name, "text": p.get("prose", "")}
)
if spk == host_bot_id:
spk_name = host_bot.get("name", "bot")
elif spk == guest_bot.get("id"):
spk_name = guest_bot.get("name", "bot")
else:
spk = p.get("speaker_id", "bot")
if spk == host_bot_id:
spk_name = host_bot.get("name", "bot")
elif spk == guest_bot.get("id"):
spk_name = guest_bot.get("name", "bot")
else:
spk_name = "bot"
interject_recent.append(
{"speaker": spk_name, "text": p.get("text", "")}
)
spk_name = "bot"
interject_recent.append(
{"speaker": spk_name, "text": entry.get("text", "")}
)
if interject_recent and interject_recent[-1].get("speaker") == you_name:
interject_recent = interject_recent[:-1]
@@ -497,21 +564,32 @@ async def regenerate_assistant_turn(
)
interject_accumulated: list[str] = []
async for chunk in client.stream(
interject_messages,
model=settings.narrative_model,
max_tokens=settings.narrative_max_tokens,
temperature=settings.narrative_temperature,
):
interject_accumulated.append(chunk)
await publish(
chat_id,
{
"event": "token",
"text": chunk,
"speaker_id": silent_witness_id,
},
)
async def _stream_interjection() -> None:
async for chunk in client.stream(
interject_messages,
model=settings.narrative_model,
max_tokens=settings.narrative_max_tokens,
temperature=settings.narrative_temperature,
):
interject_accumulated.append(chunk)
await publish(
chat_id,
{
"event": "token",
"text": chunk,
"speaker_id": silent_witness_id,
},
)
# T83.1: register the interjection sub-stream in the same
# in-flight registry so /turns/cancel collapses it too.
interject_task = asyncio.create_task(_stream_interjection())
_in_flight_tasks[chat_id] = interject_task
try:
await interject_task
finally:
_in_flight_tasks.pop(chat_id, None)
interject_text = "".join(interject_accumulated)
new_interjection_event_id = append_event(
@@ -541,7 +619,10 @@ async def regenerate_assistant_turn(
# Broadcast a replace event so connected tabs swap the prior
# interjection node in-place (mirrors T73.1's primary swap).
interject_html = render_turn_html(
silent_witness.get("name", "bot"), interject_text, role="bot"
silent_witness.get("name", "bot"),
interject_text,
role="bot",
event_id=new_interjection_event_id,
)
await publish(
chat_id,
@@ -573,17 +654,8 @@ async def regenerate_assistant_turn(
"text": interject_text,
}
]
prior_edges_post: dict[tuple[str, str], dict] = {}
for src in present_ids:
for tgt in present_ids:
if src == tgt:
continue
edge = get_edge(conn, src, tgt) or {
"affinity": 50,
"trust": 50,
"summary": "",
}
prior_edges_post[(src, tgt)] = edge
# T83.2: shared helper handles the directed-pair edge dict.
prior_edges_post = gather_prior_edges(conn, present_ids)
state_updates_post = await compute_state_updates_for_present(
client,
@@ -620,23 +692,28 @@ async def regenerate_assistant_turn(
(new_assistant_event_id, original_interjection_event_id),
)
# 10. Event-lifecycle detection (Phase 3, T61). Mirrors the post_turn
# block: classify whether any active events transitioned in the
# regenerated narrative and append the corresponding event_started /
# 9a. Event-lifecycle detection (Phase 3, T61). T83.5 cosmetic
# ordering: mirrors ``chat.web.turns.post_turn``'s 8a block — runs
# AFTER the interjection branch (and AFTER the post-interjection
# state-update + memory passes) so the classifier sees the same
# narrative-text input post_turn does. Numbering uses ``9a`` to
# match post_turn's ``8a`` shape (the interjection branch is step 9
# in regenerate vs step 8 in post_turn; lifecycle is the immediate
# follow-on in both). Behaviour identical to the prior ``step 10``
# placement — the block was already structurally last in regenerate
# because there's no scene-close pass here.
#
# Classify whether any active events transitioned in the regenerated
# narrative and append the corresponding event_started /
# event_completed / event_cancelled. ``promote_completed_event``
# runs inline after a completion so promotion artifacts land in the
# same regenerate path.
#
# Phase 3.5 follow-up: when a regenerate replaces a turn that had
# already produced event transitions, those original transitions are
# NOT undone here. The superseded ``assistant_turn`` group keeps its
# prior ``event_started`` / ``event_completed`` events in the log
# (they remain projected onto the events table). Phase 3.5 will add
# an "undo lifecycle" step to roll back the prior transitions before
# re-classifying the regenerated text. For v3 we accept that a
# regenerate-after-completion will double-emit promotion artifacts
# if the new text re-completes the same event — narratively rare,
# and a true fix needs the lifecycle-undo pass.
# T83.4 follow-up: when a regenerate replaces a turn that had
# already produced event transitions, those original transitions
# are NOT undone here (Phase 4 work). A WARNING log earlier in this
# function names the affected event_log ids — see the T83.4 block
# near the function entry.
new_active_events = list_active_events(conn, chat_id)
if new_active_events:
lifecycle_decision = await detect_event_transitions(
+155 -18
View File
@@ -29,6 +29,7 @@ keeps moving.
from __future__ import annotations
import json
import logging
import uuid
from datetime import datetime, timezone
from sqlite3 import Connection
@@ -39,6 +40,8 @@ from chat.eventlog.log import append_and_apply
from chat.llm.classify import classify
from chat.llm.client import LLMClient
_log = logging.getLogger(__name__)
class ScenePOVSummary(BaseModel):
"""Classifier output: one witness's view of a closing scene.
@@ -123,7 +126,11 @@ async def summarize_scene(
def _read_recent_dialogue(
conn: Connection, chat_id: str, *, limit: int = 50
conn: Connection,
chat_id: str,
*,
limit: int = 50,
since_event_id: int | None = None,
) -> list[dict]:
"""Pull the last ``limit`` user/assistant turns for ``chat_id``.
@@ -132,14 +139,29 @@ def _read_recent_dialogue(
the most recent turns of the chat. Superseded and hidden rows are
filtered out so regenerated turns (T29) don't bleed into the
summary.
T80.2: ``since_event_id`` clamps the result to event_log rows whose
``id >= since_event_id`` so callers needing a scene-scoped view (e.g.
thread detection on close) don't pull turns that landed before the
closing scene's ``scene_opened`` event.
"""
cur = conn.execute(
"SELECT kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(limit,),
)
if since_event_id is None:
cur = conn.execute(
"SELECT kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(limit,),
)
else:
cur = conn.execute(
"SELECT kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
" AND id >= ? "
"ORDER BY id DESC LIMIT ?",
(since_event_id, limit),
)
rows = list(reversed(cur.fetchall()))
out: list[dict] = []
for kind, payload_json in rows:
@@ -158,6 +180,65 @@ def _read_recent_dialogue(
return out
def _scene_opened_event_id(
conn: Connection, chat_id: str, scene_id: int
) -> int | None:
"""Return the event_log id of the ``scene_opened`` (or
``meanwhile_scene_started``) event that created scene row
``scene_id``. Used by T80.2 to lower-bound dialogue reads to a
single scene's transcript.
``meanwhile_scene_started`` carries an explicit ``scene_id`` so we
match on that directly. ``scene_opened`` doesn't, so we walk the
chat's scene rows in id order and zip against the chat's scene-open
events in id order — the projector creates one scene row per
scene-open event, so positions correspond.
Returns ``None`` when no matching event is found; callers should
treat that as "fall back to chat-wide" rather than over-filter.
"""
# Fast path for meanwhile children (explicit scene_id in payload).
for ev_id, payload_json in conn.execute(
"SELECT id, payload_json FROM event_log "
"WHERE kind = 'meanwhile_scene_started' "
" AND superseded_by IS NULL AND hidden = 0",
).fetchall():
try:
p = json.loads(payload_json)
except (TypeError, ValueError):
continue
if p.get("chat_id") == chat_id and p.get("scene_id") == scene_id:
return ev_id
# Fallback for parent you-scenes: zip chat-scoped scene-open events
# against chat-scoped scene rows in id order.
chat_scene_ids = [
r[0]
for r in conn.execute(
"SELECT id FROM scenes WHERE chat_id = ? ORDER BY id ASC",
(chat_id,),
).fetchall()
]
if scene_id not in chat_scene_ids:
return None
chat_open_evs: list[int] = []
for ev_id, _kind, payload_json in conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('scene_opened', 'meanwhile_scene_started') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id ASC",
).fetchall():
try:
p = json.loads(payload_json)
except (TypeError, ValueError):
continue
if p.get("chat_id") == chat_id:
chat_open_evs.append(ev_id)
idx = chat_scene_ids.index(scene_id)
if idx < len(chat_open_evs):
return chat_open_evs[idx]
return None
async def _summarize_and_apply_for_witness(
conn: Connection,
client: LLMClient,
@@ -213,7 +294,11 @@ async def _summarize_and_apply_for_witness(
# Empty default -> skip the memory rewrite; the seeded
# per-turn pov_summary stays in place.
continue
new_value = pov.summary + key_quotes_suffix
# T80.1: a prior close may have already appended a Key quotes
# suffix to this row's pov_summary. Strip it here so the fresh
# rewrite replaces the existing suffix rather than stacking a
# second one on top.
new_value = _strip_key_quotes_suffix(pov.summary) + key_quotes_suffix
append_and_apply(
conn,
kind="manual_edit",
@@ -263,6 +348,31 @@ async def _summarize_and_apply_for_witness(
return pov
# T80.1: header marker shared by the suffix builder and the
# witness-write strip step. Any text starting with this marker is treated
# as a previously-appended Key quotes suffix and stripped before reuse so
# repeated scene closes don't compose recursive bloat.
_KEY_QUOTES_HEADER = "\n\nKey quotes:\n"
def _strip_key_quotes_suffix(text: str) -> str:
"""Remove a previously-appended Key quotes suffix from ``text``.
Returns ``text`` unchanged when the marker is absent, or the prefix
up to (but not including) the marker when present. Used in two
places: (1) when sourcing quote text from a memory row that may
already carry the suffix from a prior close, and (2) when computing
the per-POV rewrite's prior_value so the new write replaces — rather
than stacks on — the old suffix.
"""
if not text:
return text
idx = text.find(_KEY_QUOTES_HEADER)
if idx >= 0:
return text[:idx]
return text
def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str:
"""If the scene's max-turn-significance is >= 2, build the
"Key quotes:" suffix from the top-3 highest-significance memory rows
@@ -274,6 +384,10 @@ def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str:
per-turn narrative seeded by T21, since this helper is called BEFORE
the per-POV rewrite. Texts are truncated to 200 chars to bound
memory row growth across many witnesses.
T80.1: candidate text is run through :func:`_strip_key_quotes_suffix`
first so a re-close (whose source memories already carry a suffix from
the prior close) doesn't quote a quote.
"""
row = conn.execute(
"SELECT MAX(significance) FROM memories WHERE scene_id = ?",
@@ -288,7 +402,7 @@ def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str:
(scene_id,),
)
quotes = [
(r[0] or "")[:200]
_strip_key_quotes_suffix(r[0] or "")[:200]
for r in cur.fetchall()
]
if not quotes:
@@ -454,20 +568,35 @@ async def apply_scene_close_summary(
},
)
# T58.2: thread detection on close. Reuses the dialogue we already
# gathered for per-POV summarization — same {speaker, text} shape
# detect_threads expects. Failure-tolerant: classify() returns the
# empty default on retry-exhaustion, and the broad except below
# protects the close pipeline from any other classifier/mock flap.
# T58.2: thread detection on close. Failure-tolerant: classify()
# returns the empty default on retry-exhaustion, and the broad except
# below protects the close pipeline from any other classifier/mock
# flap.
#
# T80.2: thread detection runs against a SCENE-SCOPED transcript,
# not the chat-wide last-50 turns used by the per-POV summaries.
# Mis-attributing threads when scene boundaries fall inside the last
# 50 turns would otherwise close threads opened in a prior scene.
scene_open_ev_id = _scene_opened_event_id(conn, chat_id, scene_id)
if scene_open_ev_id is not None:
scene_dialogue = _read_recent_dialogue(
conn, chat_id, since_event_id=scene_open_ev_id
)
else:
scene_dialogue = dialogue
try:
thread_result = await detect_threads(
client,
classifier_model=classifier_model,
scene_transcript=dialogue,
scene_transcript=scene_dialogue,
open_threads=list_open_threads(conn, chat_id),
timeout_s=timeout_s,
)
except Exception:
except Exception as exc:
# T80.3: log the swallowed exception at DEBUG so a
# programmer-error flap (e.g. wrong kwarg name) surfaces in
# local logs without breaking the close pipeline.
_log.debug("detect_threads failed: %s", exc, exc_info=True)
from chat.services.thread_detection import ThreadDetectionResult
thread_result = ThreadDetectionResult()
@@ -495,12 +624,20 @@ async def apply_scene_close_summary(
},
)
elif cand.action == "close" and cand.existing_thread_id:
# T80.4: chat-clock time, not wall clock — the rest of the
# close pipeline (memories, edges, scene_closed payloads)
# uses chat["time"] so threads must agree. Falls back to
# UTC now only when the chat row has no clock yet (defensive
# — chat_state always seeds "time" via chat_created).
chat_clock_at = chat.get("time") or datetime.now(
timezone.utc
).isoformat()
append_and_apply(
conn,
kind="thread_closed",
payload={
"thread_id": cand.existing_thread_id,
"closed_at": datetime.now(timezone.utc).isoformat(),
"closed_at": chat_clock_at,
},
)
+1
View File
@@ -96,6 +96,7 @@ async def narrate_skip(
model=narrative_model,
max_tokens=200,
temperature=0.7,
timeout_s=timeout_s,
)
text = (result or "").strip()
if not text:
+125
View File
@@ -0,0 +1,125 @@
"""Shared helpers for turn flows (T83.2).
Both ``chat.web.turns.post_turn`` and
``chat.services.regenerate.regenerate_assistant_turn`` need to:
1. Pull a chronological tail of user-side and assistant_turn events for
prompt assembly + state-update inputs.
2. Build a directed-edge dict over a fixed set of "present" entity ids
for the multi-pair state-update pass (with the schema 50/50 default
filled in for missing rows).
Before T83.2 each call site had its own copy of these blocks. The two
copies drifted on details (T73.1 added ``user_turn_edit`` handling to
turns.py; regenerate.py had a slightly different recent-window query).
This module is the single source so a future change to either lands in
both flows by construction.
Note on overlap with ``chat.services.scene_summarize._read_recent_dialogue``:
that helper has a ``since_event_id`` clamp (T80.2 thread-detection
scope) and intentionally does NOT include ``user_turn_edit`` events
its callers want the *original* prose, not edits. Deduplicating it
into here would either (a) require a new flag on the shared helper for
``user_turn_edit`` inclusion, or (b) silently change scene_summarize's
read shape. Both feel more invasive than the duplication is bad, so
that helper is left alone for now.
"""
from __future__ import annotations
import json
from sqlite3 import Connection
from chat.state.edges import get_edge
def read_recent_dialogue(
conn: Connection,
chat_id: str,
*,
limit: int = 50,
exclude_event_id: int | None = None,
) -> list[dict]:
"""Pull the last ``limit`` user-side / assistant_turn events for
``chat_id`` as ``[{"speaker": <id-or-"you">, "text": <prose>}]``,
chronologically ordered (oldest first).
Filters: ``superseded_by IS NULL AND hidden = 0`` regenerated
rows drop out so the timeline reflects the current state. Includes
``user_turn``, ``user_turn_edit`` (T29 edited prose substitutes for
the original the original is marked superseded above), and
``assistant_turn`` rows.
``exclude_event_id`` is an optional event_log id to skip used by
regenerate to drop the original assistant_turn from its prompt
context window before that row has been marked superseded (the
supersede UPDATE lands at the end so the new event_id is known).
"""
if exclude_event_id is None:
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(limit,),
)
else:
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND id != ? "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(exclude_event_id, limit),
)
rows = list(reversed(cur.fetchall()))
out: list[dict] = []
for row_id, kind, payload_json in rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
continue
if kind in ("user_turn", "user_turn_edit"):
out.append(
{
"speaker": "you",
"text": p.get("prose", ""),
"event_id": row_id,
}
)
else:
out.append(
{
"speaker": p.get("speaker_id", "bot"),
"text": p.get("text", ""),
"event_id": row_id,
}
)
return out
def gather_prior_edges(
conn: Connection, present_ids: list[str]
) -> dict[tuple[str, str], dict]:
"""Build ``{(src, tgt): {affinity, trust, summary}}`` for every
directed pair where both ``src`` and ``tgt`` are in ``present_ids``
and ``src != tgt``.
Missing rows fall back to the schema default 50/50 baseline (mirrors
the Phase 1 single-pair flow). Used by post_turn and regenerate to
seed the multi-pair state-update classifier.
"""
prior_edges: dict[tuple[str, str], dict] = {}
for src in present_ids:
for tgt in present_ids:
if src == tgt:
continue
edge = get_edge(conn, src, tgt) or {
"affinity": 50,
"trust": 50,
"summary": "",
}
prior_edges[(src, tgt)] = edge
return prior_edges
__all__ = ["read_recent_dialogue", "gather_prior_edges"]
+10
View File
@@ -125,6 +125,16 @@ def search_memories(
so that stronger candidates yield smaller composite scores; the result is
sorted ascending and truncated to ``k``. The unmodified ``fts_rank`` and a
debug-friendly ``composite_score`` are kept on each returned dict.
The result ordering applies TWO independent significance boosts:
* **SQL-side** ``ORDER BY (rank - significance * SIGNIFICANCE_RANK_BIAS)``
pushes higher-significance memories ahead in the FTS5 candidate set so
the over-fetch already prefers them for tied / near-tied BM25 ranks
(T57, §11.1).
* **Python-side** a composite re-rank with ``_SIGNIFICANCE_WEIGHT``
reinforces the ordering after candidate retrieval, alongside the
recency boost above.
"""
if witness_role not in _VALID_WITNESS_ROLES:
raise ValueError(
+34 -1
View File
@@ -17,7 +17,7 @@
<p class="muted">No turns yet. Start typing below.</p>
{% else %}
{% for turn in turns %}
<div class="turn turn-{{ turn.role }}">
<div{% if turn.event_id is not none %} id="turn-{{ turn.event_id }}"{% endif %} class="turn turn-{{ turn.role }}">
<strong>{{ turn.speaker }}</strong>
{{ turn.text|render_prose|safe }}
</div>
@@ -119,6 +119,39 @@ document.querySelector('.drawer-toggle')?.addEventListener('click', (e) => {
}
});
// T86: live-swap regenerated turns. The backend (chat/services/
// regenerate.py) broadcasts a ``turn_html_replace`` SSE frame after
// appending the new assistant_turn — JSON payload of shape
// ``{data: <html>, turn_id: <new_id>, supersedes_id: <old_id>}``.
// We replace the prior turn's DOM node in-place when we can locate
// it by id, otherwise fall back to appending so a tab opened mid-
// regenerate still shows the new turn. The renderer
// (chat/web/render.py::render_turn_html) and the Jinja loop above
// both stamp ``id="turn-<event_id>"`` on each turn DIV, so the
// primary in-place swap path is the live one — the append fallback
// only kicks in when a tab opened AFTER the regenerate started (no
// prior turn DOM node to replace).
shell.addEventListener('htmx:sseMessage', (e) => {
if (e.detail.type !== 'turn_html_replace') return;
let data;
try { data = JSON.parse(e.detail.data); } catch (_) { return; }
const html = (data && data.data) || '';
const trimmed = html.trim();
if (!trimmed) return;
const oldNode = document.getElementById('turn-' + data.supersedes_id);
if (oldNode) {
const tmpl = document.createElement('template');
tmpl.innerHTML = trimmed;
const newNode = tmpl.content.firstChild;
if (newNode) oldNode.replaceWith(newNode);
} else {
// Fallback: append if the prior turn isn't in the DOM (e.g. user
// opened the tab AFTER the regenerate started, or the renderer
// hasn't yet stamped per-turn ids — see comment above).
timeline.insertAdjacentHTML('beforeend', trimmed);
}
});
// SSE connection lost — show a banner and unlock so the user can
// retry. The server commits the partial as truncated when its
// request.is_disconnected() poll trips (T19).
+20 -2
View File
@@ -52,12 +52,30 @@ async def chat_detail(chat_id: str, request: Request, conn=Depends(get_conn)):
raw_turns = _read_recent_dialogue(conn, chat_id, limit=200)
turns: list[dict] = []
for t in raw_turns:
# event_id is forwarded so the Jinja loop can stamp
# ``id="turn-<event_id>"`` on each rendered turn — the
# ``turn_html_replace`` SSE handler in chat.html relies on this
# id to swap a regenerated turn in-place (T86 follow-up).
if t["speaker"] == "you":
turns.append({"role": "you", "speaker": "you", "text": t["text"]})
turns.append(
{
"role": "you",
"speaker": "you",
"text": t["text"],
"event_id": t.get("event_id"),
}
)
else:
bot = get_bot(conn, t["speaker"])
label = bot["name"] if bot else t["speaker"]
turns.append({"role": "bot", "speaker": label, "text": t["text"]})
turns.append(
{
"role": "bot",
"speaker": label,
"text": t["text"],
"event_id": t.get("event_id"),
}
)
return TEMPLATES.TemplateResponse(
request,
+11 -8
View File
@@ -48,6 +48,7 @@ from chat.state.world import active_scene, get_activity, get_chat, get_container
from chat.web.bots import get_conn
from chat.web.kickoff import get_llm_client
from chat.web.skip import (
ChatNotFoundError,
_now_iso,
process_elision_skip,
process_jump_skip,
@@ -993,13 +994,12 @@ async def skip_elision(
new_time=new_time,
landing_state_hint=landing_state_hint,
)
except ChatNotFoundError as exc:
# Missing chat row: typed exception (T81) replaces the prior
# ``str(exc).startswith("chat not found")`` prefix sniff.
raise HTTPException(status_code=404, detail=str(exc))
except ValueError as exc:
# ``process_elision_skip`` raises on missing-chat or malformed /
# backwards new_time. The drawer used to 404 / 400 these
# separately — preserve the 404-vs-400 split by sniffing the
# error message so existing tests keep passing without changes.
if str(exc).startswith("chat not found"):
raise HTTPException(status_code=404, detail=str(exc))
# Input-validation failure (malformed or backwards new_time).
raise HTTPException(status_code=400, detail=str(exc))
return await drawer(chat_id, request, conn)
@@ -1037,9 +1037,12 @@ async def skip_jump(
notable_prose=notable_prose,
reset_activity=reset_flag,
)
except ChatNotFoundError as exc:
# Missing chat row: typed exception (T81) replaces the prior
# ``str(exc).startswith("chat not found")`` prefix sniff.
raise HTTPException(status_code=404, detail=str(exc))
except ValueError as exc:
if str(exc).startswith("chat not found"):
raise HTTPException(status_code=404, detail=str(exc))
# Input-validation failure (malformed or backwards new_time).
raise HTTPException(status_code=400, detail=str(exc))
return await drawer(chat_id, request, conn)
+6 -1
View File
@@ -378,7 +378,12 @@ async def process_meanwhile_turn(
"truncated": truncated,
},
)
turn_html = _render_turn_html(speaker_bot["name"], text, role="bot")
turn_html = _render_turn_html(
speaker_bot["name"],
text,
role="bot",
event_id=assistant_event_id,
)
await publish(chat_id, {"event": "turn_html", "data": turn_html})
if cancelled:
+15 -2
View File
@@ -84,7 +84,13 @@ def render_prose(text: str) -> str:
return "".join(f"<p>{p}</p>" for p in paragraphs)
def render_turn_html(speaker: str, text: str, role: str = "bot") -> str:
def render_turn_html(
speaker: str,
text: str,
role: str = "bot",
*,
event_id: int | None = None,
) -> str:
"""Render a full transcript turn as ``<div class="turn …">…</div>``.
Used by both the SSE fragment publisher in :mod:`chat.web.turns`
@@ -94,12 +100,19 @@ def render_turn_html(speaker: str, text: str, role: str = "bot") -> str:
``role`` selects the CSS class (``turn-you`` vs ``turn-bot``); the
speaker label and role name are HTML-escaped defensively even though
they currently come from trusted server-side state.
``event_id`` (T86 follow-up) stamps ``id="turn-<event_id>"`` on the
wrapper div so the chat-page ``turn_html_replace`` SSE handler can
locate the prior turn node by id and swap it in-place. When omitted
the id attribute is dropped so SSE-only fragments without a stable
event id (legacy callers) still render cleanly.
"""
speaker_html = html.escape(speaker)
role_html = html.escape(role)
body_html = render_prose(text)
id_attr = f' id="turn-{int(event_id)}"' if event_id is not None else ""
return (
f'<div class="turn turn-{role_html}">'
f'<div{id_attr} class="turn turn-{role_html}">'
f"<strong>{speaker_html}</strong>"
f"{body_html}"
f"</div>"
+21 -6
View File
@@ -36,6 +36,17 @@ from chat.state.entities import get_bot, get_you
from chat.state.world import get_activity, get_chat
class ChatNotFoundError(Exception):
"""Raised when a ``chat_id`` doesn't resolve to a chat row.
Distinguishes the missing-chat case from generic input-validation
failures (which still raise :class:`ValueError`). HTTP callers map
this to ``404`` and ``ValueError`` to ``400`` replacing the
earlier ``str(exc).startswith("chat not found")`` prefix sniff
(T81) with a typed dispatch.
"""
def _parse_iso_time(value: str) -> datetime | None:
"""Permissive ISO 8601 parser shared with the drawer routes (T59).
@@ -93,13 +104,14 @@ async def process_elision_skip(
..., "assistant_event_id": ...}`` so callers can introspect the
generated turn (e.g. for SSE rebroadcast or test assertions).
Raises ``ValueError`` on validation failure or when the chat row
can't be located (the drawer maps it to ``HTTP 400`` / ``404``
respectively; the natural-language path follows the same shape).
Raises :class:`ChatNotFoundError` when the chat row is missing
(HTTP ``404``) and ``ValueError`` on input-validation failure
(HTTP ``400``). Splitting the two lets the drawer route dispatch
on type instead of sniffing the error string (T81).
"""
chat = get_chat(conn, chat_id)
if chat is None:
raise ValueError(f"chat not found: {chat_id}")
raise ChatNotFoundError(f"chat not found: {chat_id}")
_validate_new_time(chat, new_time)
@@ -178,11 +190,13 @@ async def process_jump_skip(
Returns ``{"assistant_text": ..., "speaker_id": ..., "skip_event_id":
..., "assistant_event_id": ...}``.
Raises ``ValueError`` on validation failure (caller maps to ``400``).
Raises :class:`ChatNotFoundError` on missing chat (caller maps to
``404``) and ``ValueError`` on input-validation failure (caller maps
to ``400``).
"""
chat = get_chat(conn, chat_id)
if chat is None:
raise ValueError(f"chat not found: {chat_id}")
raise ChatNotFoundError(f"chat not found: {chat_id}")
_validate_new_time(chat, new_time)
@@ -280,6 +294,7 @@ def _now_iso() -> str:
__all__ = [
"ChatNotFoundError",
"process_elision_skip",
"process_jump_skip",
"_now_iso",
+98 -52
View File
@@ -64,10 +64,17 @@ from chat.services.event_promotion import promote_completed_event
from chat.services.interjection import detect_interjection
from chat.services.memory_write import record_turn_memory_for_present
from chat.services.multi_state_update import compute_state_updates_for_present
from chat.services.prompt import assemble_narrative_prompt
from chat.services.prompt import (
assemble_narrative_prompt,
consume_pending_meanwhile_digests,
)
from chat.services.rewind import compute_rewind_preview, execute_rewind
from chat.services.scene_close import detect_scene_close
from chat.services.scene_summarize import apply_scene_close_summary
from chat.services.turn_common import (
gather_prior_edges,
read_recent_dialogue,
)
from chat.services.turn_parse import ParsedTurn, parse_turn
from chat.state.edges import get_edge
from chat.state.entities import get_bot, get_you
@@ -79,7 +86,11 @@ from chat.web.kickoff import get_llm_client
from chat.web.meanwhile import process_meanwhile_turn
from chat.web.pubsub import publish
from chat.web.render import render_turn_html as _render_turn_html
from chat.web.skip import _parse_iso_time, process_elision_skip
from chat.web.skip import (
ChatNotFoundError,
_parse_iso_time,
process_elision_skip,
)
router = APIRouter()
@@ -106,38 +117,13 @@ def _strip_ooc_for_prompt(parsed: ParsedTurn) -> str:
def _read_recent_dialogue(conn, chat_id: str, limit: int = 200) -> list[dict]:
"""Return user-side and assistant_turn events for ``chat_id``.
Includes ``user_turn``, ``user_turn_edit`` (T29 edited prose), and
``assistant_turn``. Ordered oldest-first; superseded/hidden rows are
skipped so regenerated turns (T29) drop out of the rendered timeline.
Each entry is shaped ``{"speaker": <id-or-"you">, "text": <prose>}``
for the prompt assembler and the chat-detail template.
T83.2: thin delegate over
:func:`chat.services.turn_common.read_recent_dialogue` so post_turn
and regenerate share one implementation. The wrapper survives so
the chat-detail template and other callers in this module don't all
have to update at once.
"""
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(limit,),
)
rows = cur.fetchall()
rows.reverse() # back to chronological order
out: list[dict] = []
for _row_id, kind, payload_json in rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
continue
if kind in ("user_turn", "user_turn_edit"):
# Edited prose substitutes for the original user_turn (the
# original is marked superseded_by and filtered above).
out.append({"speaker": "you", "text": p.get("prose", "")})
else:
out.append(
{
"speaker": p.get("speaker_id", "bot"),
"text": p.get("text", ""),
}
)
return out
return read_recent_dialogue(conn, chat_id, limit=limit)
def _detect_addressee_id(
@@ -204,17 +190,8 @@ def _gather_state_update_inputs(
present_names[guest_bot["id"]] = guest_bot["name"]
personas[guest_bot["id"]] = guest_bot.get("persona") or ""
prior_edges: dict[tuple[str, str], dict] = {}
for src in present_ids:
for tgt in present_ids:
if src == tgt:
continue
edge = get_edge(conn, src, tgt) or {
"affinity": 50,
"trust": 50,
"summary": "",
}
prior_edges[(src, tgt)] = edge
# T83.2: directed-edge gather is shared with regenerate.py.
prior_edges = gather_prior_edges(conn, present_ids)
return present_ids, present_names, personas, prior_edges
@@ -310,6 +287,49 @@ async def post_turn(
)
if intent == "skip_elision":
# T82.2: run scene-close detection on the user's prose BEFORE
# the skip controller fires. Prose like "fade out, skip an hour"
# carries both a close signal and a skip directive; we want the
# close summary to capture the closing scene's final beat (and
# promote per-POV memories) before the time advances. Order
# matters: scene close -> skip narration -> time advance.
#
# When there's no active scene (or the prose carries no close
# signal) ``detect_scene_close`` returns the safe
# ``should_close=False`` default and we drop straight to the
# skip controller — same behavior as today, no extra cost.
skip_scene = active_scene(conn, chat_id)
if skip_scene is not None:
container = None
if skip_scene.get("container_id") is not None:
container = get_container(conn, skip_scene["container_id"])
container_name = container["name"] if container else "unknown"
close_decision = await detect_scene_close(
client,
model=settings.classifier_model,
prose=prose,
current_container_name=container_name,
)
if close_decision.should_close:
append_and_apply(
conn,
kind="scene_closed",
payload={
"scene_id": skip_scene["id"],
"ended_at": chat.get("time"),
"significance": 0,
},
)
await apply_scene_close_summary(
conn,
client,
classifier_model=settings.classifier_model,
chat_id=chat_id,
scene_id=skip_scene["id"],
host_bot_id=host_bot["id"],
timeout_s=settings.classifier_timeout_s,
)
# Derive ``new_time`` from the chat clock. Phase 3 stub: bump by
# 1 hour. The drawer's elision form is the structured path when
# the author wants a specific landing time; here the goal is
@@ -333,11 +353,15 @@ async def post_turn(
landing_state_hint=getattr(parsed, "landing_state_hint", "")
or "",
)
except ChatNotFoundError as exc:
# Defensive: chat existence is checked above, so this only
# fires on a TOCTOU race where the chat row is deleted
# mid-request. T81 split the typed missing-chat case out of
# the generic ValueError so we keep the 404 mapping here.
raise HTTPException(status_code=404, detail=str(exc))
except ValueError as exc:
# The controller raises on missing chat / bad new_time.
# Missing chat is already handled above (we'd have 404'd);
# a bad new_time here is a stub-derivation bug rather than
# user input — surface as 400 with the controller message.
# Bad new_time is a stub-derivation bug rather than user
# input — surface as 400 with the controller message.
raise HTTPException(status_code=400, detail=str(exc))
return Response(status_code=204)
@@ -459,7 +483,11 @@ async def post_turn(
# 7. Append the assistant_turn with the final text. (See note above on
# why we skip ``project`` for these transcript-only event kinds.)
append_event(
# Capture the returned event id so we can stamp ``id="turn-<n>"`` on
# the SSE-emitted HTML fragment — the chat-page ``turn_html_replace``
# handler relies on the id to swap regenerated turns in-place
# (T86 follow-up).
primary_assistant_event_id = append_event(
conn,
kind="assistant_turn",
payload={
@@ -559,6 +587,7 @@ async def post_turn(
interjection_text: str | None = None
interjection_speaker_id: str | None = None
interjection_truncated = False
interjection_event_id: int | None = None
if (
guest_bot is not None
and not cancelled
@@ -646,7 +675,9 @@ async def post_turn(
interjection_text = "".join(interject_accumulated)
append_event(
# Capture the event id (T86 follow-up) so the SSE fragment
# below carries ``id="turn-<n>"`` for in-place swap.
interjection_event_id = append_event(
conn,
kind="assistant_turn",
payload={
@@ -878,6 +909,15 @@ async def post_turn(
timeout_s=settings.classifier_timeout_s,
)
# 9a. Consume any pending meanwhile digests now that the assistant_turn
# (which surfaced them in its prompt via T65's helper) has landed. The
# spec's "first you-turn AFTER meanwhile close consumes the digest"
# semantics are preserved by running this AFTER scene-close detection
# — anything pending right now belongs to the prompt we just answered,
# so it's safe to mark consumed and the NEXT turn starts clean.
# Idempotent: re-calling produces zero events when nothing's pending.
consume_pending_meanwhile_digests(conn, chat_id)
# 10. Broadcast a JSON completion event (for JS consumers) and an HTML
# fragment event (for HTMX SSE swap-into-timeline). One pair per
# written assistant_turn so the timeline ends up with both the
@@ -892,7 +932,10 @@ async def post_turn(
},
)
primary_html = _render_turn_html(
addressee_bot["name"], primary_text, role="bot"
addressee_bot["name"],
primary_text,
role="bot",
event_id=primary_assistant_event_id,
)
await publish(
chat_id, {"event": "turn_html", "data": primary_html}
@@ -916,7 +959,10 @@ async def post_turn(
},
)
interject_html = _render_turn_html(
interject_speaker_name, interjection_text, role="bot"
interject_speaker_name,
interjection_text,
role="bot",
event_id=interjection_event_id,
)
await publish(
chat_id, {"event": "turn_html", "data": interject_html}
+35
View File
@@ -97,3 +97,38 @@ async def test_classifier_failure_falls_back_to_host():
assert result.addressee_id == "bot_a"
assert result.reason == "fallback"
assert result.confidence == "low"
@pytest.mark.asyncio
async def test_invalid_confidence_value_falls_back_to_default():
"""Pydantic rejects ``confidence`` values outside the literal set
(``high`` / ``medium`` / ``low``). After the retry budget is
exhausted, classify returns the configured fallback default
here that's ``confidence="low"`` with ``reason="fallback"``.
"""
canned = [
json.dumps(
{
"addressee_id": "bot_a",
"confidence": "VERY_HIGH",
"reason": "out-of-range value",
}
),
"still_bad",
"still_bad",
]
client = MockLLMClient(canned=canned)
result = await detect_addressee(
client,
classifier_model="test-model",
user_prose="anything",
host_id="bot_a",
host_name="BotA",
guest_id="bot_b",
guest_name="BotB",
)
assert result.addressee_id == "bot_a"
assert result.confidence == "low"
assert result.reason == "fallback"
+37
View File
@@ -273,6 +273,43 @@ def test_post_skip_elision_advances_clock_and_emits_narration(client, tmp_path):
assert tp["speaker_id"] == "bot_a"
def test_skip_route_404_via_typed_exception_class(client, tmp_path):
"""T81: drawer skip routes 404 via :class:`ChatNotFoundError`.
Pre-T81, the route caught ``ValueError`` and recovered the 404 case
by sniffing ``str(exc).startswith("chat not found")`` fragile if
the message ever changed wording. The controller now raises a typed
exception so the route dispatches on type. Asserting the 404 from
the unseeded chat exercises the typed branch end-to-end; importing
the class confirms it's a real subclass of ``Exception`` and not a
re-export of ``ValueError`` (which would defeat the type split).
"""
# Don't seed any chat — the controller hits ``get_chat`` returning
# ``None`` and raises ``ChatNotFoundError``. The drawer route then
# maps that to ``404`` via the typed handler (no string sniff).
_override_llm([])
try:
response = client.post(
"/chats/nonexistent/drawer/skip/elision",
data={
"landing_state_hint": "x",
"new_time": "2026-04-26T20:30:00+00:00",
},
)
assert response.status_code == 404
finally:
app.dependency_overrides.clear()
# The exception class itself is importable, distinct from ValueError,
# and a proper Exception subclass — pinning the type-based dispatch
# so future refactors can't quietly collapse it back to a string sniff.
from chat.web.skip import ChatNotFoundError
assert ChatNotFoundError is not None
assert issubclass(ChatNotFoundError, Exception)
assert not issubclass(ChatNotFoundError, ValueError)
def test_post_skip_elision_invalid_time_returns_400(client, tmp_path):
_seed_chat(tmp_path / "test.db")
_override_llm([])
+170
View File
@@ -570,3 +570,173 @@ def test_meanwhile_turn_registered_in_in_flight_tasks(
# Post-flight: the entry has been cleaned up so the next turn (or
# the cancel route) doesn't see a stale task.
assert "chat_bot_a" not in _in_flight_tasks
def test_meanwhile_turn_cancellation_via_route(app_state_setup, tmp_path):
"""T85.2: a cancellation that fires while a meanwhile beat is
streaming truncates the assistant_turn and skips the post-turn
memory + state-update writes the same end-to-end shape the
/turns/cancel route produces.
Drives the cancel by hijacking ``client.stream`` to raise
CancelledError on its first iteration the exact pattern proven
by ``test_cancelled_turn_still_closes_scene_when_user_prose_signals_close``
in ``tests/test_turn_flow.py``. This mirrors what
``cancel_turn`` does in production (``task.cancel()`` schedules a
CancelledError on the next await); doing the raise inline avoids
the TestClient-loop-reentry problem that prevents driving a second
POST mid-stream from the same synchronous test thread, while
exercising the same code path: the meanwhile streamer's
``except asyncio.CancelledError`` block at meanwhile.py:276 sets
``cancelled=True`` + ``truncated=True``, the assistant_turn lands
with the partial, and the memory/state-update branch is skipped.
The ``_in_flight_tasks`` registration that wires the cancel route
to the meanwhile streamer is independently pinned by
``test_meanwhile_turn_registered_in_in_flight_tasks`` above; this
test pins the downstream behavioural shape the registration
enables together they cover the full Stop-button lifecycle for
meanwhile beats.
Behavioural pins:
* ``assistant_turn`` lands with ``truncated=True``,
``meanwhile_scene_id=2``, ``speaker_id="bot_a"``.
* No ``memory_written`` events fire (cancel skips per-bot writes).
* No post-turn ``edge_update`` events fire (cancel skips state updates).
* ``_in_flight_tasks`` is empty post-flight.
"""
from typing import AsyncIterator, Sequence
from chat.llm.client import Message
from chat.web.turns import _in_flight_tasks
_seed_meanwhile_chat(tmp_path / "test.db")
class _CancelOnStreamMock(MockLLMClient):
"""Yields CancelledError on first iteration of ``stream`` —
simulates ``cancel_turn`` having fired ``task.cancel()`` on the
in-flight streaming task. ``generate`` is delegated to the
canned-queue base so parse_turn still resolves cleanly.
"""
async def stream(
self, messages: Sequence[Message], *, model: str, **params
) -> AsyncIterator[str]:
raise asyncio.CancelledError
yield # pragma: no cover — keeps this an async generator.
canned_parse = json.dumps(
{"segments": [{"kind": "narration", "text": "they exchange a glance"}]}
)
# Canned queue: only parse_turn — the narrative slot is never pulled
# because stream raises before consuming it, and post-turn
# state-update is skipped by the cancel branch.
mock = _CancelOnStreamMock(canned=[canned_parse])
from chat.web.kickoff import get_llm_client
app.dependency_overrides[get_llm_client] = lambda: mock
try:
# The meanwhile controller re-raises CancelledError after the
# partial assistant_turn is recorded (meanwhile.py:387). The
# outer post_turn route has no catch for CancelledError on the
# meanwhile path (turns.py:244-254 only catches ValueError), so
# the exception propagates up through Starlette. TestClient
# surfaces that as a 500 or a propagated exception depending on
# Starlette/asyncio versions; we don't pin the response.
try:
app_state_setup.post(
"/chats/chat_bot_a/turns",
data={"prose": "they exchange a glance"},
)
except BaseException:
pass
finally:
app.dependency_overrides.clear()
with open_db(tmp_path / "test.db") as conn:
assistant_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'assistant_turn' ORDER BY id"
).fetchall()
memory_count = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'"
).fetchone()[0]
# Edge updates AFTER the assistant_turn (i.e. excluding seeded ones).
max_at_row = conn.execute(
"SELECT MAX(id) FROM event_log WHERE kind = 'assistant_turn'"
).fetchone()
max_at = max_at_row[0] if max_at_row[0] is not None else 0
post_turn_edge_updates = conn.execute(
"SELECT COUNT(*) FROM event_log "
"WHERE kind = 'edge_update' AND id > ?",
(max_at,),
).fetchone()[0]
# The cancelled assistant_turn was still recorded with truncated=True,
# carrying whatever partial text accumulated before cancel propagated
# (zero text here since the cancel hits on the first iteration).
assert len(assistant_rows) == 1
payload = json.loads(assistant_rows[0][0])
assert payload["truncated"] is True, payload
assert payload["meanwhile_scene_id"] == 2
assert payload["speaker_id"] == "bot_a"
# No per-bot memory writes — cancellation short-circuits the memory
# + state-update branch (see chat/web/meanwhile.py:308).
assert memory_count == 0
# No post-turn edge_updates — same short-circuit.
assert post_turn_edge_updates == 0
# Post-flight: registry cleared so the cancel route won't try to
# re-cancel a defunct task on a follow-up POST.
assert "chat_bot_a" not in _in_flight_tasks
def test_meanwhile_cancel_route_no_op_after_turn_completes(
app_state_setup, tmp_path
):
"""T85.2: POST ``/chats/<id>/turns/cancel`` AFTER a meanwhile turn
has fully completed is a silent 204 no-op there is no in-flight
task to cancel, the registry is empty, and the route must not error.
Pins the cancel endpoint's robustness against the common-but-racy
sequence where the user clicks Stop just after the stream finished
(the SSE channel hasn't yet flipped the client-side ``isStreaming``
flag). This is a complement to the snapshot test: the snapshot test
pins that the registry IS populated mid-flight, this test pins that
it isn't AFTER and that the route copes gracefully.
"""
from chat.web.turns import _in_flight_tasks
_seed_meanwhile_chat(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "narration", "text": "they exchange a glance"}]}
)
canned = [
canned_parse,
"BotA leans in. *quietly*",
_zero_state(),
_zero_state(),
]
mock = _override_llm(canned)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns",
data={"prose": "they exchange a glance"},
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
assert mock._canned == []
# Registry was cleaned up after the stream completed.
assert "chat_bot_a" not in _in_flight_tasks
# Cancel after-the-fact: 204, no error, registry stays empty.
cancel_response = app_state_setup.post(
"/chats/chat_bot_a/turns/cancel"
)
assert cancel_response.status_code == 204
assert "chat_bot_a" not in _in_flight_tasks
+88
View File
@@ -444,3 +444,91 @@ def test_record_for_present_dict_keys_match(tmp_path):
narrative_text="Both bots witness this.",
)
assert set(result_with_guest.keys()) == {"bot_a", "bot_b"}
# ---------------------------------------------------------------------------
# T84: unified record_turn_memory_for_present API with you_present kwarg.
# ---------------------------------------------------------------------------
def test_record_turn_memory_you_present_false_writes_meanwhile_witness_mask(tmp_path):
"""When ``you_present=False`` the witness mask should be
``[you=0, host=1, guest=1]`` for both bots the meanwhile shape."""
db = tmp_path / "t.db"
apply_migrations(db)
_seed_two_bots(db)
with open_db(db) as conn:
result = record_turn_memory_for_present(
conn,
chat_id="chat_ab",
host_bot_id="bot_a",
guest_bot_id="bot_b",
narrative_text="BotA and BotB confer privately.",
scene_id=None,
chat_clock_at="2026-04-26T20:00:00+00:00",
you_present=False,
)
assert set(result.keys()) == {"bot_a", "bot_b"}
rows = conn.execute(
"SELECT owner_id, witness_you, witness_host, witness_guest "
"FROM memories ORDER BY owner_id"
).fetchall()
assert len(rows) == 2
for _owner, w_you, w_host, w_guest in rows:
assert w_you == 0
assert w_host == 1
assert w_guest == 1
# Two memory_written events were appended.
cur = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'"
)
assert cur.fetchone()[0] == 2
def test_record_turn_memory_you_present_true_default_writes_normal_witness_mask(tmp_path):
"""Default ``you_present=True`` preserves Phase 2 behaviour:
``witness_you=1`` for the host POV row."""
db = tmp_path / "t.db"
apply_migrations(db)
_seed_minimal(db)
with open_db(db) as conn:
# No explicit you_present arg — should default to True.
result = record_turn_memory_for_present(
conn,
chat_id="chat_bot_a",
host_bot_id="bot_a",
guest_bot_id=None,
narrative_text="BotA hums to herself.",
)
assert set(result.keys()) == {"bot_a"}
row = conn.execute(
"SELECT witness_you, witness_host, witness_guest "
"FROM memories WHERE owner_id = 'bot_a'"
).fetchone()
assert row is not None
w_you, w_host, w_guest = row
assert w_you == 1
assert w_host == 1
assert w_guest == 0
def test_record_turn_memory_you_present_false_requires_guest(tmp_path):
"""Calling with ``you_present=False`` and no ``guest_bot_id`` is a
programming error meanwhile scenes always have both bots."""
db = tmp_path / "t.db"
apply_migrations(db)
_seed_minimal(db)
with open_db(db) as conn:
with pytest.raises(ValueError, match="you_present=False requires guest_bot_id"):
record_turn_memory_for_present(
conn,
chat_id="chat_bot_a",
host_bot_id="bot_a",
guest_bot_id=None,
narrative_text="invalid",
you_present=False,
)
+525
View File
@@ -1418,3 +1418,528 @@ def test_consumed_digest_does_not_render_again(tmp_path):
body2 = msgs2[0].content
assert "Meanwhile while you were away:" not in body2
assert digest_text not in body2
# ---------------------------------------------------------------------------
# T80: scene_summarize polish bundle.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_scene_close_re_run_does_not_double_suffix(tmp_path):
"""T80.1: re-running ``apply_scene_close_summary`` on the same scene
must NOT stack a second "Key quotes:" suffix on each pov_summary. The
builder strips any existing suffix from candidate text before
composing the new one, and the per-POV write replaces (not appends
to) the existing suffix.
"""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA had a heavy talk with you.",
"knowledge_facts": [],
"relationship_summary": "Things shifted.",
}
)
no_threads = json.dumps({"candidates": []})
with open_db(db) as conn:
_seed_single_bot_scene_no_memory(conn)
# Significance >= 2 triggers the Key quotes suffix path.
_seed_memory(conn, pov_summary="Maya quote one", significance=3)
_seed_memory(conn, pov_summary="Maya quote two", significance=2)
project(conn)
# First close.
client = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchall()
assert rows
for (pov,) in rows:
assert pov.count("Key quotes:") == 1
# Second close on the same scene with fresh canned responses.
client2 = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client2,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows2 = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchall()
assert rows2
for (pov,) in rows2:
# Still exactly ONE "Key quotes:" suffix — no recursive bloat.
assert pov.count("Key quotes:") == 1
# And no nested-quote artifacts (the suffix wasn't sourced
# from a row whose text already contained the suffix).
inner_count = pov.count("Key quotes:")
assert inner_count == 1
@pytest.mark.asyncio
async def test_thread_detection_uses_scene_scoped_transcript(
tmp_path, monkeypatch
):
"""T80.2: when a chat has multiple closed scenes, the second scene's
close must hand ``detect_threads`` ONLY the second scene's turns —
not the chat-wide last-50, which would bleed in the first scene's
transcript and risk mis-closing threads."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
captured_transcripts: list[list[dict]] = []
async def capturing_detect_threads(client, **kwargs):
captured_transcripts.append(list(kwargs["scene_transcript"]))
return td_mod.ThreadDetectionResult()
monkeypatch.setattr(td_mod, "detect_threads", capturing_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
# Seed scene 1 + 3 turns + close.
_seed_single_bot_scene(conn)
# Add two extra distinct turns inside scene 1 so the transcript
# has clearly-scene-1 markers we can assert on.
append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "SCENE_ONE_USER_TURN",
"segments": [],
},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "SCENE_ONE_BOT_TURN",
"truncated": False,
"user_turn_id": 2,
},
)
project(conn)
# Close scene 1.
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
# Open scene 2 with distinct dialogue. Use append_and_apply so
# the new events project incrementally without re-running the
# already-applied seed events.
from chat.eventlog.log import append_and_apply
append_and_apply(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T21:00:00+00:00",
"participants": ["you", "bot_a"],
},
)
append_and_apply(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"scene_id": 2,
"pov_summary": "Original (scene 2)",
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"significance": 1,
},
)
append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "SCENE_TWO_USER_TURN",
"segments": [],
},
)
append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "SCENE_TWO_BOT_TURN",
"truncated": False,
"user_turn_id": 3,
},
)
# Close scene 2.
client2 = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client2,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=2,
host_bot_id="bot_a",
)
# The second close's transcript holds only scene-2 markers.
assert len(captured_transcripts) == 2
scene_two_transcript = captured_transcripts[1]
joined = " ".join(t.get("text", "") for t in scene_two_transcript)
assert "SCENE_TWO" in joined
assert "SCENE_ONE" not in joined
@pytest.mark.asyncio
async def test_detect_threads_failure_is_logged(tmp_path, monkeypatch, caplog):
"""T80.3: when ``detect_threads`` raises, the broad except must log
the failure at DEBUG so a programmer-error flap surfaces in local
logs even though the close pipeline keeps moving."""
import logging
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
async def boom(client, **kwargs):
raise RuntimeError("test-detect-threads-boom")
monkeypatch.setattr(td_mod, "detect_threads", boom)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
project(conn)
caplog.set_level(logging.DEBUG, logger="chat.services.scene_summarize")
client = MockLLMClient(canned=[canned])
# Close should NOT raise even though detect_threads did.
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
# Log carries the error message.
assert any(
"detect_threads failed" in rec.message
and "test-detect-threads-boom" in rec.message
for rec in caplog.records
), [r.message for r in caplog.records]
@pytest.mark.asyncio
async def test_thread_closed_uses_chat_clock_time(tmp_path, monkeypatch):
"""T80.4: emitted ``thread_closed`` events stamp ``closed_at`` with
the chat-clock time (chat["time"]), not the host's wall clock. The
rest of the close pipeline already does this; threads must agree
so timeline reconstruction stays consistent."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
async def fake_detect_threads(client, **kwargs):
return td_mod.ThreadDetectionResult(
candidates=[
td_mod.ThreadCandidate(
action="close",
existing_thread_id="thr_x",
summary="resolved",
),
]
)
monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
# Pre-seed an open thread so the "close" candidate has something
# real to close, and pin the chat clock to a known value.
from chat.eventlog.log import append_and_apply
import chat.state.threads # noqa: F401
append_and_apply(
conn,
kind="thread_opened",
payload={
"thread_id": "thr_x",
"chat_id": "chat_bot_a",
"title": "Lingering question",
"summary": "What did Maya hide?",
},
)
project(conn)
# UPDATE chat_state AFTER project so the re-projection doesn't
# overwrite the pinned clock value.
chat_clock = "2026-04-26T10:00:00+00:00"
conn.execute(
"UPDATE chat_state SET time = ? WHERE chat_id = ?",
(chat_clock, "chat_bot_a"),
)
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'thread_closed'"
).fetchall()
assert len(rows) == 1
payload = json.loads(rows[0][0])
assert payload["thread_id"] == "thr_x"
assert payload["closed_at"] == chat_clock
# ---------------------------------------------------------------------------
# T80.5: T58 coverage gaps (truncation, thread update/close emissions).
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_key_quote_truncation_at_200_chars(tmp_path):
"""T80.5: when a memory's pov_summary exceeds 200 chars, the
Key-quote bullet truncates the source text to exactly 200 chars
(no ellipsis a hard slice, per the existing T58 implementation)."""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA had a heavy talk.",
"knowledge_facts": [],
"relationship_summary": "Things shifted.",
}
)
no_threads = json.dumps({"candidates": []})
long_text = "X" * 500 # 500 X's; expected slice is 200 X's.
with open_db(db) as conn:
_seed_single_bot_scene_no_memory(conn)
_seed_memory(conn, pov_summary=long_text, significance=2)
project(conn)
client = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
new_pov = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchone()[0]
assert "Key quotes:" in new_pov
# The bullet should contain exactly 200 X's, not 500.
# Format from _build_key_quotes_suffix: ``- "<text>"``.
bullet_marker = '- "'
idx = new_pov.index(bullet_marker)
# Count consecutive X's after the bullet marker.
x_run = 0
for ch in new_pov[idx + len(bullet_marker):]:
if ch == "X":
x_run += 1
else:
break
assert x_run == 200, (
f"expected 200-char truncation, got {x_run}"
)
@pytest.mark.asyncio
async def test_thread_detection_update_candidate_emits_thread_updated(
tmp_path, monkeypatch
):
"""T80.5: a detect_threads ``update`` candidate produces a
``thread_updated`` event with the candidate's summary and a
last_referenced_scene_id pointing at the closed scene."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
async def fake_detect_threads(client, **kwargs):
return td_mod.ThreadDetectionResult(
candidates=[
td_mod.ThreadCandidate(
action="update",
existing_thread_id="thr_x",
summary="updated summary",
),
]
)
monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
from chat.eventlog.log import append_and_apply
import chat.state.threads # noqa: F401
# Pre-seed the open thread so the update has a row to target.
append_and_apply(
conn,
kind="thread_opened",
payload={
"thread_id": "thr_x",
"chat_id": "chat_bot_a",
"title": "Lingering question",
"summary": "old summary",
},
)
project(conn)
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'thread_updated'"
).fetchall()
assert len(rows) == 1
payload = json.loads(rows[0][0])
assert payload["thread_id"] == "thr_x"
assert payload["summary"] == "updated summary"
assert payload["last_referenced_scene_id"] == 1
@pytest.mark.asyncio
async def test_thread_detection_close_candidate_emits_thread_closed(
tmp_path, monkeypatch
):
"""T80.5: a detect_threads ``close`` candidate produces a
``thread_closed`` event for the existing thread."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
async def fake_detect_threads(client, **kwargs):
return td_mod.ThreadDetectionResult(
candidates=[
td_mod.ThreadCandidate(
action="close",
existing_thread_id="thr_x",
summary="resolved",
),
]
)
monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
from chat.eventlog.log import append_and_apply
import chat.state.threads # noqa: F401
append_and_apply(
conn,
kind="thread_opened",
payload={
"thread_id": "thr_x",
"chat_id": "chat_bot_a",
"title": "Lingering question",
"summary": "open",
},
)
project(conn)
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'thread_closed'"
).fetchall()
assert len(rows) == 1
payload = json.loads(rows[0][0])
assert payload["thread_id"] == "thr_x"
# closed_at field is present (T80.4 verifies its value).
assert "closed_at" in payload
+10 -9
View File
@@ -39,11 +39,11 @@ Cross-feature notes discovered while writing these tests:
swallowed. Tests that don't care about thread coverage can omit the
slot; test 2 includes a valid thread response to exercise the path.
- ``consume_pending_meanwhile_digests`` is defined in chat.services.prompt
but is NOT currently wired into the post_turn flow. The digest stays
pending across turns until the helper is called explicitly. Test 4
reflects this: it asserts the digest renders pre-consumption AND
post-consumption (driven via the helper directly), and that the
meanwhile_digest_consumed event lands in the event_log.
and is wired into the END of post_turn (after scene-close detection)
by T82.1. Test 4 still drives the helper directly because it asserts
the helper's contract in isolation (no post_turn round-trip in scope);
the explicit call doubles as defensive coverage and is idempotent a
second call on already-consumed digests is a no-op.
- The host-only ``apply_scene_close_summary`` canned queue layout is
``[host_pov, thread_detection]`` (2 slots) when a single bot is present
and there are dialogue rows, with thread_detection being optional /
@@ -769,10 +769,11 @@ def test_meanwhile_close_digest_surfaces_then_consumed(
the digest is gone, and a meanwhile_digest_consumed event landed.
Cross-feature finding: ``consume_pending_meanwhile_digests`` is
defined in chat.services.prompt but is NOT wired into the post_turn
flow. The digest stays pending across turns until callers invoke
the helper. Test exercises the helper directly so the consumption
contract is pinned independent of any future post_turn integration.
defined in chat.services.prompt and wired into post_turn by T82.1
(after scene-close detection). This test exercises the helper
directly so the consumption contract is pinned in isolation from
the post_turn round-trip; T82.1's wiring is covered by a dedicated
test in tests/test_turn_flow.py.
Canned queue for the meanwhile turn:
1. parse_turn
+8 -1
View File
@@ -21,7 +21,7 @@ import chat.state.world # noqa: F401
import chat.state.events # noqa: F401
import chat.state.threads # noqa: F401
from chat.llm.client import Message
from chat.services.prompt import assemble_narrative_prompt
from chat.services.prompt import _witness_role_for, assemble_narrative_prompt
def _seed_basic(conn) -> None:
@@ -852,3 +852,10 @@ def test_assemble_with_open_thread_renders_block(tmp_path):
body = msgs[0].content
assert "Open threads:" in body
assert "Maya's job hunt" in body
def test_witness_role_for_none_host_returns_host():
assert _witness_role_for("bot_a", None) == "host"
# Sanity check: existing semantics preserved.
assert _witness_role_for("bot_a", "bot_a") == "host"
assert _witness_role_for("bot_a", "bot_b") == "guest"
+353
View File
@@ -662,3 +662,356 @@ def test_regenerate_drops_interjection_when_classifier_returns_false(
new_primary_payload = json.loads(cur[0][0])
assert new_primary_payload["text"] == "New primary text."
assert "interjection_of" not in new_primary_payload
def test_regenerate_with_prior_lifecycle_logs_warning(tmp_path, monkeypatch, caplog):
"""T83.4: when the superseded assistant_turn already produced
lifecycle transitions (event_started / event_completed /
event_cancelled), regenerate emits a WARNING naming the un-rolled-
back transitions. Phase 3.5 documents the gap; the actual rollback
is Phase 4 work.
"""
import asyncio
import logging
from chat.config import Settings
from chat.db.migrate import apply_migrations
from chat.eventlog.log import append_and_apply
from chat.services.regenerate import regenerate_assistant_turn
db_path = tmp_path / "test.db"
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
monkeypatch.setenv("CHAT_DB_PATH", str(db_path))
apply_migrations(db_path)
_ut_id, at_id = _seed_with_one_turn(db_path)
# After the assistant_turn lands, simulate that the turn flow
# produced an event_completed transition. ``append_and_apply`` is
# the standard path so the events projection updates.
with open_db(db_path) as conn:
append_and_apply(
conn,
kind="event_planned",
payload={
"event_id": "evt_x",
"chat_id": "chat_bot_a",
"kind": "story_event",
"props": {},
"planned_for": "2026-04-30T18:00:00+00:00",
},
)
append_and_apply(
conn,
kind="event_started",
payload={
"event_id": "evt_x",
"started_at": "2026-04-30T19:00:00+00:00",
},
)
completed_id = append_and_apply(
conn,
kind="event_completed",
payload={
"event_id": "evt_x",
"completed_at": "2026-04-30T19:30:00+00:00",
},
)
assert completed_id is not None
state_canned = json.dumps(
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
)
mock_client = MockLLMClient(
canned=["Refreshed reply.", state_canned, state_canned]
)
settings = Settings(featherless_api_key="test")
caplog.set_level(logging.WARNING, logger="chat.services.regenerate")
with open_db(db_path) as conn:
asyncio.run(
regenerate_assistant_turn(
conn,
mock_client,
settings=settings,
chat_id="chat_bot_a",
original_assistant_event_id=at_id,
)
)
# The warning records the count and at least one of the affected
# event_log ids (event_started + event_completed = at minimum 2).
warnings = [
r for r in caplog.records if r.levelname == "WARNING"
]
matching = [w for w in warnings if "lifecycle transition" in w.getMessage()]
assert matching, (
"expected a WARNING about un-rolled-back lifecycle transitions; "
f"got: {[w.getMessage() for w in warnings]}"
)
msg = matching[0].getMessage()
# Reference the original superseded turn's id and the event_completed
# row's id.
assert str(at_id) in msg
assert str(completed_id) in msg
def test_regenerate_sibling_lookup_scoped_to_chat(tmp_path, monkeypatch):
"""T83.3: regenerate's sibling-interjection lookup is scoped to the
chat being regenerated.
Setup: TWO chats, each with a primary + interjection turn group whose
rows happen to share the same ``user_turn_id`` value (the projector
assigns event_log ids monotonically across the whole database, so
when each chat is seeded back-to-back the chat A primary lands on a
different ``user_turn_id`` than chat B's — but in older versions the
sibling query had no chat predicate, so it could in principle latch
onto a row from a different chat if ids collided in some unusual
flow). We construct the seeding so chat B's interjection has the
SAME ``interjection_of`` value as the chat A primary's speaker_id —
pre-T83.3 the global query could have picked it up.
Assert: regenerating the chat A primary leaves chat B's rows
untouched (no supersede), and the regenerated chat A turn group's
interjection (the only one regenerate should regenerate) has its
``regenerated_from`` pointing at the chat A original interjection,
not chat B's.
"""
import asyncio
from chat.config import Settings
from chat.db.migrate import apply_migrations
from chat.services import regenerate as regenerate_module
from chat.services.interjection import InterjectionDecision
from chat.services.regenerate import regenerate_assistant_turn
db_path = tmp_path / "test.db"
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
monkeypatch.setenv("CHAT_DB_PATH", str(db_path))
apply_migrations(db_path)
# Seed chat A's interjection group.
a_ut_id, a_primary_id, a_interjection_id = _seed_with_interjection_group(
db_path
)
# Seed chat B with the same shape but a different chat_id and bot
# ids, then add an interjection group whose ``interjection_of``
# points at "bot_a" so a global (unscoped) query could collide.
with open_db(db_path) as conn:
for bot_id, name in (("bot_c", "BotC"), ("bot_d", "BotD")):
append_event(
conn,
kind="bot_authored",
payload={
"id": bot_id,
"name": name,
"persona": "",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "",
},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_other",
"host_bot_id": "bot_c",
"guest_bot_id": "bot_d",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
b_ut_id = append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_other",
"prose": "different chat",
"segments": [],
},
)
b_primary_id = append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_other",
"speaker_id": "bot_c",
"text": "Other primary.",
"truncated": False,
"user_turn_id": b_ut_id,
},
)
# The chat B interjection's ``interjection_of`` references
# "bot_a" — the chat A primary's speaker. Pre-T83.3 the global
# sibling query could mis-match this row.
b_interjection_id = append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_other",
"speaker_id": "bot_d",
"text": "Cross-chat noise.",
"truncated": False,
"user_turn_id": b_ut_id,
"interjection_of": "bot_a",
},
)
# Stub the interjection classifier to return True so the regenerate
# actively walks the sibling-discovery path.
async def _stub_should_interject(*_args, **_kwargs):
return InterjectionDecision(should_interject=True, reason="fired")
monkeypatch.setattr(
regenerate_module, "detect_interjection", _stub_should_interject
)
state_canned = json.dumps(
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
)
canned: list[str] = (
["New chat A primary."]
+ [state_canned] * 6
+ ["New chat A interjection."]
+ [state_canned] * 6
)
mock_client = MockLLMClient(canned=list(canned))
settings = Settings(featherless_api_key="test")
with open_db(db_path) as conn:
new_text = asyncio.run(
regenerate_assistant_turn(
conn,
mock_client,
settings=settings,
chat_id="chat_multi",
original_assistant_event_id=a_primary_id,
)
)
assert new_text == "New chat A primary."
# Chat B rows are untouched — neither superseded nor referenced.
b_primary_super = conn.execute(
"SELECT superseded_by FROM event_log WHERE id = ?",
(b_primary_id,),
).fetchone()[0]
b_interjection_super = conn.execute(
"SELECT superseded_by FROM event_log WHERE id = ?",
(b_interjection_id,),
).fetchone()[0]
assert b_primary_super is None
assert b_interjection_super is None
# Chat A's regenerated interjection has its ``regenerated_from``
# pointing at chat A's original interjection — NOT chat B's.
cur = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'assistant_turn' "
" AND id NOT IN (?, ?, ?, ?) "
" AND superseded_by IS NULL",
(a_primary_id, a_interjection_id, b_primary_id, b_interjection_id),
).fetchall()
# Two new rows: regenerated primary + regenerated interjection.
assert len(cur) == 2
payloads = [json.loads(row[0]) for row in cur]
# Find the regenerated interjection (carries interjection_of).
new_interject_payloads = [
p for p in payloads if p.get("interjection_of")
]
assert len(new_interject_payloads) == 1
assert new_interject_payloads[0]["regenerated_from"] == a_interjection_id
# Pin chat scope on every new row.
for p in payloads:
assert p["chat_id"] == "chat_multi"
def test_regenerate_registers_task_in_in_flight_tasks(tmp_path, monkeypatch):
"""T83.1: regenerate's streaming Task is registered in the chat-keyed
``_in_flight_tasks`` dict so the /turns/cancel route can cancel a
mid-regenerate stream. Mirrors the meanwhile registration pattern
pinned by tests/test_meanwhile_turn_flow.py.
Snapshot pattern: a custom MockLLMClient subclass captures the
presence of the chat_id in ``_in_flight_tasks`` at the first stream
yield (when the regenerate coroutine is awaiting our generator and
the task is alive). Post-flight, the entry must be cleaned up so the
next regenerate / turn registers a fresh task.
"""
import asyncio
from typing import AsyncIterator, Sequence
from chat.config import Settings
from chat.db.migrate import apply_migrations
from chat.llm.client import Message
from chat.services.regenerate import regenerate_assistant_turn
from chat.web.turns import _in_flight_tasks
db_path = tmp_path / "test.db"
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
monkeypatch.setenv("CHAT_DB_PATH", str(db_path))
apply_migrations(db_path)
_ut_id, at_id = _seed_with_one_turn(db_path)
in_flight_snapshot: dict = {}
class _SnapshotMock(MockLLMClient):
async def stream(
self, messages: Sequence[Message], *, model: str, **params
) -> AsyncIterator[str]:
text = self._canned.pop(0)
for i, ch in enumerate(text):
if i == 0:
in_flight_snapshot["present"] = (
"chat_bot_a" in _in_flight_tasks
)
in_flight_snapshot["task"] = _in_flight_tasks.get(
"chat_bot_a"
)
yield ch
state_canned = json.dumps(
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
)
mock_client = _SnapshotMock(
canned=["Refreshed reply.", state_canned, state_canned]
)
settings = Settings(featherless_api_key="test")
# Pre-condition: registry empty for this chat.
assert "chat_bot_a" not in _in_flight_tasks
with open_db(db_path) as conn:
new_text = asyncio.run(
regenerate_assistant_turn(
conn,
mock_client,
settings=settings,
chat_id="chat_bot_a",
original_assistant_event_id=at_id,
)
)
assert new_text == "Refreshed reply."
# Mid-flight: the streaming task was present in the registry, and
# the captured value was an asyncio.Task.
assert in_flight_snapshot.get("present") is True, (
"_in_flight_tasks was empty at first yield — regenerate stream "
"isn't registering its task"
)
assert isinstance(in_flight_snapshot.get("task"), asyncio.Task)
# Post-flight: the entry has been cleaned up.
assert "chat_bot_a" not in _in_flight_tasks
+23
View File
@@ -85,3 +85,26 @@ def test_render_prose_mixed_full_message():
assert '<em class="action">looks up</em>' in out
# The apostrophe in ``she's`` is HTML-escaped to ``&#x27;``.
assert '<span class="ooc">((she&#x27;s tired))</span>' in out
def test_render_turn_html_stamps_event_id_when_provided():
"""T86 follow-up: when ``event_id`` is supplied the wrapper DIV
carries ``id="turn-<event_id>"`` so the chat-page
``turn_html_replace`` SSE handler can locate the prior turn DOM
node by id and swap it in-place. Without the id the handler's
``getElementById('turn-' + supersedes_id)`` lookup misses and
the regenerated turn appends instead of replaces.
"""
out = render_turn_html("BotA", "Hello.", role="bot", event_id=42)
assert 'id="turn-42"' in out
# The id must sit on the wrapper DIV, not somewhere nested inside.
assert out.startswith('<div id="turn-42" class="turn turn-bot">')
def test_render_turn_html_omits_id_when_event_id_missing():
"""Legacy callers (no ``event_id`` passed) get a clean DIV with no
id attribute preserves the pre-T86 fragment shape.
"""
out = render_turn_html("BotA", "Hello.", role="bot")
assert "id=" not in out
assert out.startswith('<div class="turn turn-bot">')
+43
View File
@@ -98,6 +98,49 @@ class _RaisingMock:
yield # pragma: no cover - make this a generator
class _RecordingMock:
"""Mock LLMClient that records the kwargs passed to ``generate``.
Used to assert that callers plumb through optional parameters like
``timeout_s`` instead of swallowing them. Returns a fixed string so
the surrounding fallback path is not exercised.
"""
def __init__(self) -> None:
self.captured_kwargs: dict | None = None
async def generate(
self, messages: Sequence[Message], *, model: str, **params
) -> str:
self.captured_kwargs = dict(params)
return "ok"
async def stream(
self, messages: Sequence[Message], *, model: str, **params
) -> AsyncIterator[str]:
raise RuntimeError("not used")
yield # pragma: no cover - make this a generator
@pytest.mark.asyncio
async def test_narrate_skip_passes_timeout_through():
mock = _RecordingMock()
await narrate_skip(
mock,
narrative_model="x",
skip_kind="jump",
speaker_bot=_SPEAKER,
you_name="Me",
current_time="late evening",
new_time="next morning",
current_activity="winding down for the night",
landing_state_hint="having coffee in the kitchen",
timeout_s=12.5,
)
assert mock.captured_kwargs is not None
assert mock.captured_kwargs.get("timeout_s") == 12.5
@pytest.mark.asyncio
async def test_narrate_falls_back_on_generation_failure():
new_time = "next morning"
+71
View File
@@ -174,3 +174,74 @@ def test_chat_html_includes_stop_streaming_script(client, tmp_path):
assert "stop-streaming" in body or "isStreaming" in body
# Cancel route reference must be wired so the Stop button can call it.
assert "/turns/cancel" in body
def test_chat_html_has_turn_html_replace_listener(client, tmp_path):
"""T86: the chat shell wires a JS handler for the ``turn_html_replace``
SSE event so regenerate-driven swaps land in connected tabs without a
page refresh.
This is a presence / string-check test: it verifies the handler is
embedded in the rendered template but does NOT drive a real browser
(no headless runner is wired into this test environment). The end-to-
end behaviour receiving the event over SSE and replacing the prior
turn's DOM node — is therefore not exercised here; a manual smoke
check or future browser-driven test would close that gap.
"""
_seed_chat(tmp_path / "test.db")
response = client.get("/chats/chat_bot_a")
assert response.status_code == 200
body = response.text
# The handler must be wired against the SSE event name the backend
# publishes (chat.services.regenerate -> "turn_html_replace").
assert "turn_html_replace" in body
# Confirm the handler reads the JSON payload's ``supersedes_id`` so
# it can locate the prior turn node. The exact lookup mechanism may
# vary, but the field name is part of the contract with the backend.
assert "supersedes_id" in body
def test_rendered_turn_html_includes_event_id(client, tmp_path):
"""T86 follow-up: the chat-detail Jinja loop stamps
``id="turn-<event_id>"`` on every rendered turn DIV. Without this id
the ``turn_html_replace`` SSE handler's ``getElementById`` lookup
misses, falls through to ``insertAdjacentHTML('beforeend', )``, and
the regenerated turn appears APPENDED instead of swapped in-place
(rendering the primary handler path dead code exactly the gap the
T86 reviewer flagged).
Seed a user_turn + assistant_turn, GET the chat page, and assert the
response body carries both turns' event ids on the wrapper DIVs.
"""
db_path = tmp_path / "test.db"
_seed_chat(db_path)
with open_db(db_path) as conn:
ut_id = append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "hello bot",
"segments": [],
},
)
at_id = append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "Hi there.",
"truncated": False,
"user_turn_id": ut_id,
},
)
conn.commit()
response = client.get("/chats/chat_bot_a")
assert response.status_code == 200
body = response.text
# Both seeded turns must carry ``id="turn-<event_id>"`` so the SSE
# in-place swap can find them.
assert f'id="turn-{ut_id}"' in body
assert f'id="turn-{at_id}"' in body
+221
View File
@@ -0,0 +1,221 @@
"""Shared turn helpers (T83.2).
``chat.services.turn_common`` extracts two snippets that were duplicated
between ``chat.web.turns`` and ``chat.services.regenerate``: the recent
user-side / assistant_turn read, and the directed-pair edge gather for
the multi-pair state-update pass. These tests pin the helpers' behavior
independently of either call site.
"""
from __future__ import annotations
from chat.db.connection import open_db
from chat.db.migrate import apply_migrations
from chat.eventlog.log import append_event
from chat.eventlog.projector import project
from chat.services.turn_common import gather_prior_edges, read_recent_dialogue
def _seed_basic_chat(db_path):
"""Seed bot + chat + a couple of edges + one round of user/assistant
turns. Returns ``(user_turn_id, assistant_turn_id)``.
"""
apply_migrations(db_path)
with open_db(db_path) as conn:
append_event(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "thoughtful",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "",
},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_a",
"affinity_delta": 7,
"trust_delta": 3,
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "you",
"target_id": "bot_a",
"chat_id": "chat_a",
"affinity_delta": 2,
"trust_delta": 1,
},
)
ut_id = append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_a",
"prose": "hello",
"segments": [],
},
)
at_id = append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_a",
"speaker_id": "bot_a",
"text": "Original.",
"truncated": False,
"user_turn_id": ut_id,
},
)
project(conn)
return ut_id, at_id
def test_read_recent_dialogue_returns_chronological_pairs(tmp_path):
"""``read_recent_dialogue`` returns oldest-first ``{speaker, text}``
entries scoped to the requested chat. Speaker is "you" for user-side
rows and the assistant_turn's ``speaker_id`` for bot rows.
"""
db = tmp_path / "test.db"
_seed_basic_chat(db)
with open_db(db) as conn:
out = read_recent_dialogue(conn, "chat_a", limit=10)
# Each entry now carries the source ``event_log.id`` as ``event_id``
# (T86 follow-up) so the chat-detail Jinja loop can stamp
# ``id="turn-<n>"`` on each rendered turn DIV — needed by the
# ``turn_html_replace`` SSE handler for in-place regenerate swaps.
speakers = [(e["speaker"], e["text"]) for e in out]
assert speakers == [
("you", "hello"),
("bot_a", "Original."),
]
assert all("event_id" in e and isinstance(e["event_id"], int) for e in out)
def test_read_recent_dialogue_filters_superseded_and_other_chats(tmp_path):
"""Superseded rows drop out (regenerate-aware). Rows scoped to a
different chat are also filtered. ``exclude_event_id`` excludes a
specific row even when it isn't superseded yet (regenerate uses this
to drop the original assistant_turn before the supersede UPDATE
lands).
"""
db = tmp_path / "test.db"
ut_id, at_id = _seed_basic_chat(db)
with open_db(db) as conn:
# Append a second user/assistant pair.
ut_id2 = append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_a",
"prose": "how are you",
"segments": [],
},
)
at_id2 = append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_a",
"speaker_id": "bot_a",
"text": "Second.",
"truncated": False,
"user_turn_id": ut_id2,
},
)
# And a row scoped to a different chat — must NOT appear.
append_event(
conn,
kind="user_turn",
payload={
"chat_id": "other_chat",
"prose": "should be filtered",
"segments": [],
},
)
# Mark the first assistant_turn as superseded — must drop out.
conn.execute(
"UPDATE event_log SET superseded_by = ? WHERE id = ?",
(at_id2, at_id),
)
out = read_recent_dialogue(conn, "chat_a", limit=10)
# First (superseded) assistant turn dropped; "other_chat" rows
# filtered; first user_turn still present.
speakers = [(e["speaker"], e["text"]) for e in out]
assert speakers == [
("you", "hello"),
("you", "how are you"),
("bot_a", "Second."),
]
# exclude_event_id drops at_id2 even though it's not superseded.
out2 = read_recent_dialogue(
conn, "chat_a", limit=10, exclude_event_id=at_id2
)
speakers2 = [(e["speaker"], e["text"]) for e in out2]
assert ("bot_a", "Second.") not in speakers2
assert ("you", "how are you") in speakers2
# Ensure ut_id is still part of the dataset (sanity for the seed).
assert ut_id is not None
def test_gather_prior_edges_fills_missing_with_default(tmp_path):
"""``gather_prior_edges`` returns one entry per directed pair across
``present_ids``. Missing rows fall back to the schema default
50/50 baseline; existing rows carry their stored values.
"""
db = tmp_path / "test.db"
_seed_basic_chat(db)
with open_db(db) as conn:
out = gather_prior_edges(conn, ["bot_a", "you"])
# 2 entities -> 2 directed pairs (a->b and b->a, no self-pairs).
assert set(out.keys()) == {("bot_a", "you"), ("you", "bot_a")}
bot_to_you = out[("bot_a", "you")]
you_to_bot = out[("you", "bot_a")]
# Both edges seeded with deltas — they must reflect the projected
# affinity/trust (not the default 50/50).
assert bot_to_you["affinity"] == 57 # 50 + 7
assert bot_to_you["trust"] == 53 # 50 + 3
assert you_to_bot["affinity"] == 52
assert you_to_bot["trust"] == 51
# A pair with no row yet falls back to 50/50.
with open_db(db) as conn:
out_with_missing = gather_prior_edges(
conn, ["bot_a", "you", "ghost_bot"]
)
# 3 entities -> 6 directed pairs.
assert len(out_with_missing) == 6
fallback = out_with_missing[("bot_a", "ghost_bot")]
assert fallback["affinity"] == 50
assert fallback["trust"] == 50
assert fallback["summary"] == ""
+244
View File
@@ -1317,3 +1317,247 @@ def test_skip_command_does_not_run_narrative_classifier(
"assemble_narrative_prompt was called on the skip path; the "
"natural-language skip dispatch must bypass narrative assembly."
)
# ---------------------------------------------------------------------------
# Phase 3.5 (T82.1) — post_turn consumes pending meanwhile digests.
#
# The helper ``consume_pending_meanwhile_digests`` lives in
# chat.services.prompt and is now wired into the END of post_turn (after
# scene-close detection, before the response broadcast). This pins the
# wiring so future refactors don't accidentally drop the call and leave
# digests pending forever.
# ---------------------------------------------------------------------------
def test_post_turn_consumes_pending_meanwhile_digests(
app_state_setup, tmp_path
):
"""Seed a pending meanwhile digest via ``meanwhile_digest_created``,
POST a regular you-turn through post_turn, and assert:
1. A ``meanwhile_digest_consumed`` event lands in the event_log.
2. ``list_pending_meanwhile_digests`` returns empty after the turn.
The post_turn flow surfaces the digest in the prompt (T65) and then
consumes it (T82.1) so the next turn starts clean.
"""
_seed(tmp_path / "test.db")
db_path = tmp_path / "test.db"
# Seed a pending digest directly via the projection event. The scene_id
# field doesn't need to reference an existing meanwhile scene for the
# digest table — the FK is on the digest payload only.
with open_db(db_path) as conn:
append_and_apply(
conn,
kind="meanwhile_digest_created",
payload={
"scene_id": 99,
"chat_id": "chat_bot_a",
"summary": "While you were away, the bots talked.",
},
)
# Confirm the digest is pending before the turn lands.
from chat.state.meanwhile import list_pending_meanwhile_digests
assert len(list_pending_meanwhile_digests(conn, "chat_bot_a")) == 1
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "hello"}]}
)
# Standard 4-slot queue: parse + narrative + 2 state-updates. No
# active scene so scene-close detection short-circuits without an LLM
# call (consistent with the no-guest regression test).
mock = _override_llm(
[canned_parse, "Hi there.", _zero_state(), _zero_state()]
)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "hello"}
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
# All canned slots drained — no extra classifier calls fired.
assert mock._canned == []
with open_db(db_path) as conn:
# A meanwhile_digest_consumed event landed for the seeded digest.
consumed_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'meanwhile_digest_consumed' ORDER BY id"
).fetchall()
assert len(consumed_rows) == 1
# The pending list is empty after consumption.
from chat.state.meanwhile import list_pending_meanwhile_digests
assert list_pending_meanwhile_digests(conn, "chat_bot_a") == []
# ---------------------------------------------------------------------------
# Phase 3.5 (T82.2) — natural-language skip runs scene close detection.
#
# A user typing "fade out, skip an hour" should close the scene FIRST
# (so the close summary captures the closing scene's final beat) and
# THEN run the elision skip. Without this wiring, the skip dispatch
# branch bypasses scene close entirely.
# ---------------------------------------------------------------------------
def test_natural_language_skip_with_close_signal_closes_scene(
app_state_setup, tmp_path
):
"""Prose that hard-signals a close ("fade out, skip to morning") and
parses as ``intent=skip_elision`` must:
1. Land a ``scene_closed`` event before any skip event.
2. Run ``apply_scene_close_summary`` for the closing scene.
3. Land a ``time_skip_elision`` event AFTER the scene_closed.
Order matters the scene_closed id must be lower than the
time_skip_elision id in the event_log.
Canned queue (single-bot, scene seeded, NO prior dialogue rows):
1. parse_turn -> intent=skip_elision
2. detect_scene_close -> should_close=True
3. apply_scene_close_summary host POV
4. narrate_skip narration
detect_threads (T58.2 fires on every close) short-circuits when the
scene-scoped transcript is empty in this test no user/assistant
turns landed in scene 1 before the close, so no thread-detection
slot is needed.
"""
# Seed an open scene so detect_scene_close has something to act on.
db_path = tmp_path / "test.db"
with open_db(db_path) as conn:
append_event(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "thoughtful, observant",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="container_created",
payload={
"chat_id": "chat_bot_a",
"name": "office",
"type": "workplace",
"properties": {},
},
)
append_event(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T20:00:00+00:00",
"participants": ["you", "bot_a"],
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
"knowledge_facts": ["coworker"],
},
)
for entity_id, verb in [("you", "talking"), ("bot_a", "listening")]:
append_event(
conn,
kind="activity_change",
payload={
"entity_id": entity_id,
"posture": "sitting",
"action": {
"verb": verb,
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
project(conn)
canned_parse = json.dumps(
{
"segments": [
{"kind": "narration", "text": "fade out, skip to morning"}
],
"intent": "skip_elision",
"landing_state_hint": "morning at home",
}
)
canned_close = json.dumps(
{"should_close": True, "reason": "fade out signaled"}
)
canned_pov = json.dumps(
{
"summary": "BotA noticed the day winding down.",
"knowledge_facts": [],
"relationship_summary": "warmer",
}
)
canned_narration = "The night fades and morning arrives."
mock = _override_llm(
[
canned_parse,
canned_close,
canned_pov,
canned_narration,
]
)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns",
data={"prose": "fade out, skip to morning"},
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
# All 4 canned slots drained — close + skip both ran end-to-end.
assert mock._canned == []
with open_db(db_path) as conn:
# scene_closed and time_skip_elision both landed.
scene_close_rows = conn.execute(
"SELECT id FROM event_log WHERE kind = 'scene_closed'"
).fetchall()
skip_rows = conn.execute(
"SELECT id FROM event_log WHERE kind = 'time_skip_elision'"
).fetchall()
assert len(scene_close_rows) == 1, "scene_closed must land"
assert len(skip_rows) == 1, "time_skip_elision must land"
# Order: scene close first, then skip.
assert scene_close_rows[0][0] < skip_rows[0][0], (
"scene_closed must precede time_skip_elision in the event_log"
)