Files
chat/tests/test_turn_flow.py
T
Joseph Doherty baffeb3a44 chore: scene-close-on-cancel — strengthen regression test + document rationale (T108)
Investigation surfaced a transactional bug in the cancel path: when the
primary stream raises asyncio.CancelledError mid-stream, post_turn
re-raises at end-of-function, and open_db's dependency teardown skips
conn.commit() — rolling back ALL post-cancel writes including the
scene_closed event. The existing T74.3 regression test only passes
because asyncio is not imported at module scope, so CancelledError
becomes NameError (caught by except Exception, leaves cancelled=False).
Documented in turns.py + test docstring; deferred for triage.
2026-04-27 04:47:26 -04:00

1598 lines
58 KiB
Python

"""End-to-end turn flow (T19): user POSTs prose, server parses, streams via SSE.
Covers:
- POST ``/chats/<id>/turns`` returns 404 when the chat doesn't exist.
- A successful POST appends both a ``user_turn`` and an ``assistant_turn``
event in chronological order. The assistant payload carries the full
streamed text and ``truncated=False``.
- After a turn lands, the chat detail GET renders the user prose and the
assistant text from the event log.
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from chat.app import app
from chat.db.connection import open_db
from chat.eventlog.log import append_and_apply, append_event
from chat.eventlog.projector import project
from chat.llm.mock import MockLLMClient
@pytest.fixture
def client(tmp_path, monkeypatch):
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
db = tmp_path / "test.db"
monkeypatch.setenv("CHAT_DB_PATH", str(db))
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "hello"}]}
)
canned_response = "Hi there."
# Two state-update classifier calls fire after the assistant_turn
# (one per directed edge: bot->you, you->bot). We feed them benign
# zero-delta JSON so the existing assertions about ``user_turn`` /
# ``assistant_turn`` are unaffected.
canned_state_update = json.dumps(
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
)
# T26 scene-close detection runs after the state-update pass. These
# tests don't seed an active scene so the classifier is short-circuited
# in turns.py — but the canned slot is harmless to leave in place,
# and adding it documents the order even when the call doesn't fire.
canned_scene_close = json.dumps(
{"should_close": False, "reason": "no signal"}
)
# Import here so env vars are visible to the dependency lookup.
from chat.web.kickoff import get_llm_client
mock = MockLLMClient(
canned=[
canned_parse,
canned_response,
canned_state_update,
canned_state_update,
canned_scene_close,
]
)
app.dependency_overrides[get_llm_client] = lambda: mock
with TestClient(app) as c:
# Disable the lifespan-managed background worker — it would
# otherwise try to score significance through Featherless with
# a fake test API key. Worker behavior is exercised directly in
# tests/test_significance.py with a mock LLM factory.
app.state.background_worker.enabled = False
c.mock_llm = mock # type: ignore[attr-defined]
yield c
app.dependency_overrides.clear()
def _seed(db_path: Path) -> None:
"""Author a bot, create a chat, and seed enough state for prompt assembly."""
with open_db(db_path) as conn:
append_event(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "thoughtful, observant",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
# Seed an edge so the prompt assembler has something to render.
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
"knowledge_facts": ["coworker"],
},
)
# Activity for both speakers — required by the prompt assembler.
append_event(
conn,
kind="activity_change",
payload={
"entity_id": "you",
"posture": "sitting",
"action": {
"verb": "talking",
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
append_event(
conn,
kind="activity_change",
payload={
"entity_id": "bot_a",
"posture": "sitting",
"action": {
"verb": "listening",
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
project(conn)
def test_post_turn_404_when_chat_missing(client):
response = client.post("/chats/no_such/turns", data={"prose": "hello"})
assert response.status_code == 404
def test_post_turn_appends_user_and_assistant_events(client, tmp_path):
_seed(tmp_path / "test.db")
response = client.post(
"/chats/chat_bot_a/turns", data={"prose": "hello"}
)
assert response.status_code == 204
with open_db(tmp_path / "test.db") as conn:
cur = conn.execute(
"SELECT kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn') ORDER BY id"
)
rows = cur.fetchall()
assert len(rows) == 2
assert rows[0][0] == "user_turn"
assert rows[1][0] == "assistant_turn"
user_payload = json.loads(rows[0][1])
assert user_payload["chat_id"] == "chat_bot_a"
assert user_payload["prose"] == "hello"
# Segments come from the canned classifier output.
assert any(
s.get("kind") == "dialogue" and s.get("text") == "hello"
for s in user_payload["segments"]
)
assistant_payload = json.loads(rows[1][1])
assert assistant_payload["chat_id"] == "chat_bot_a"
assert assistant_payload["speaker_id"] == "bot_a"
assert assistant_payload["text"] == "Hi there."
assert assistant_payload["truncated"] is False
def test_get_chat_renders_existing_turns(client, tmp_path):
_seed(tmp_path / "test.db")
post = client.post("/chats/chat_bot_a/turns", data={"prose": "hello"})
assert post.status_code == 204
response = client.get("/chats/chat_bot_a")
assert response.status_code == 200
body = response.text
assert "hello" in body
assert "Hi there." in body
# ---------------------------------------------------------------------------
# Phase 2 (T44) — multi-entity turn flow.
#
# These tests cover the post_turn flow when a guest is present: addressee
# detection, multi-pair state-update + multi-witness memory writes, and
# the optional interjection follow-on. Each test installs its own
# MockLLMClient with a canned-response queue tailored to the call shape
# of that scenario; the queue is documented at the top of each test so
# the orchestration is auditable.
# ---------------------------------------------------------------------------
def _bot_payload(bot_id: str, name: str, persona: str = "") -> dict:
return {
"id": bot_id,
"name": name,
"persona": persona or f"persona for {name}",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
}
def _seed_chat_with_guest(db_path: Path) -> None:
"""Author host BotA + guest BotB, create a chat with both wired in,
and seed an open scene plus minimal activity rows so the prompt
assembler sees a third party. Edges are seeded for all six directed
pairs at the schema-default 50/50 baseline so multi-pair state
updates land cleanly."""
with open_db(db_path) as conn:
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_b", "BotB"))
append_event(
conn,
kind="you_authored",
payload={"name": "Me", "pronouns": "they/them", "persona": ""},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"guest_bot_id": "bot_b",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
# Container + open scene so scene_close detection has something
# to act on in the per-POV summary test.
append_event(
conn,
kind="container_created",
payload={
"chat_id": "chat_bot_a",
"name": "office",
"type": "workplace",
"properties": {},
},
)
append_event(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T20:00:00+00:00",
"participants": ["you", "bot_a", "bot_b"],
},
)
# Seed all six directed edges so state-update writes land on
# initialized rows. Knowledge fact on bot_a -> you exercises
# the existing-fact preservation path.
for src, tgt, facts in [
("bot_a", "you", ["coworker"]),
("you", "bot_a", []),
("bot_b", "you", []),
("you", "bot_b", []),
("bot_a", "bot_b", []),
("bot_b", "bot_a", []),
]:
append_event(
conn,
kind="edge_update",
payload={
"source_id": src,
"target_id": tgt,
"chat_id": "chat_bot_a",
"knowledge_facts": facts,
},
)
for entity_id, verb in [
("you", "talking"),
("bot_a", "listening"),
("bot_b", "listening"),
]:
append_event(
conn,
kind="activity_change",
payload={
"entity_id": entity_id,
"posture": "sitting",
"action": {
"verb": verb,
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
project(conn)
def _override_llm(canned: list[str]) -> MockLLMClient:
"""Wire a fresh ``MockLLMClient`` and return it so tests can introspect
the residual canned queue after the request."""
from chat.web.kickoff import get_llm_client
mock = MockLLMClient(canned=list(canned))
app.dependency_overrides[get_llm_client] = lambda: mock
return mock
def _zero_state() -> str:
return json.dumps(
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
)
@pytest.fixture
def app_state_setup(tmp_path, monkeypatch):
"""Same env wiring as the existing ``client`` fixture but without a
pre-installed MockLLMClient — the multi-entity tests pin their own
canned queues per scenario.
"""
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
db = tmp_path / "test.db"
monkeypatch.setenv("CHAT_DB_PATH", str(db))
with TestClient(app) as c:
app.state.background_worker.enabled = False
yield c
app.dependency_overrides.clear()
def test_single_bot_turn_no_guest_regression(app_state_setup, tmp_path):
"""No-guest regression: the canned-response queue remains parse +
narrative + 2 state-updates. Interjection is path-bypassed because
the chat has no guest, so ``detect_interjection`` is NOT invoked.
Ends with one user_turn, one assistant_turn, two edge_updates, and a
single ``memory_written``.
"""
_seed(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "hello"}]}
)
mock = _override_llm(
[canned_parse, "Hi there.", _zero_state(), _zero_state()]
)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "hello"}
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
# No guest -> no interjection classifier call -> queue fully drained.
assert mock._canned == []
with open_db(tmp_path / "test.db") as conn:
cur = conn.execute(
"SELECT kind FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn', 'edge_update', "
" 'memory_written') ORDER BY id"
)
kinds = [r[0] for r in cur.fetchall()]
user_turns = [k for k in kinds if k == "user_turn"]
assistant_turns = [k for k in kinds if k == "assistant_turn"]
edge_updates_after_seed = [k for k in kinds if k == "edge_update"]
memory_writes = [k for k in kinds if k == "memory_written"]
assert len(user_turns) == 1
assert len(assistant_turns) == 1
# Seed adds exactly one edge_update (bot_a -> you); the post-turn
# pass adds two more for a total of three.
assert len(edge_updates_after_seed) == 3
assert len(memory_writes) == 1
def test_multi_bot_turn_no_interjection(app_state_setup, tmp_path):
"""Chat has a guest; ``detect_interjection`` returns False. Verify:
1 user_turn + 1 assistant_turn + 6 *post-turn* edge_updates + 2
memory_written events. Single turn_html broadcast.
Canned queue (11 calls):
1. parse_turn
2. detect_addressee (T74.1) -> host
3. narrative stream (primary, addressee = host because the prose
doesn't name the guest)
4-9. 6 state-update calls (one per directed pair across {you,
bot_a, bot_b})
10. detect_interjection -> should_interject=False
11. detect_scene_close -> should_close=False
"""
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "hello room"}]}
)
canned = [
canned_parse,
json.dumps(
{"addressee_id": "bot_a", "confidence": "medium", "reason": "host"}
),
"Greetings.",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_interject": False, "reason": "calm"}),
json.dumps({"should_close": False, "reason": "no signal"}),
]
mock = _override_llm(canned)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "hello room"}
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
# All 10 canned slots should have been consumed.
assert mock._canned == []
with open_db(tmp_path / "test.db") as conn:
# Count post-turn edge_updates (i.e. those after the latest
# assistant_turn id).
max_at = conn.execute(
"SELECT MAX(id) FROM event_log WHERE kind = 'assistant_turn'"
).fetchone()[0]
cur = conn.execute(
"SELECT COUNT(*) FROM event_log "
"WHERE kind = 'edge_update' AND id > ?",
(max_at,),
)
post_turn_edge_updates = cur.fetchone()[0]
cur = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'user_turn'"
)
user_turn_count = cur.fetchone()[0]
cur = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'assistant_turn'"
)
assistant_turn_count = cur.fetchone()[0]
cur = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'"
)
memory_count = cur.fetchone()[0]
assert user_turn_count == 1
assert assistant_turn_count == 1
assert post_turn_edge_updates == 6
assert memory_count == 2
def test_multi_bot_turn_with_interjection(app_state_setup, tmp_path):
"""Chat has a guest; ``detect_interjection`` returns True. Verify:
1 user_turn + 2 assistant_turns + (6 + 6) post-turn edge_updates +
4 memory_written events.
Canned queue (17 calls):
1. parse_turn
2. detect_addressee (T74.1) -> host
3. narrative stream (primary)
4-9. 6 state-update calls (post-primary)
10. detect_interjection -> should_interject=True
11. narrative stream (interjection)
12-17. 6 state-update calls (post-interjection)
18. detect_scene_close -> should_close=False
"""
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "tell me"}]}
)
canned = [
canned_parse,
json.dumps(
{"addressee_id": "bot_a", "confidence": "medium", "reason": "host"}
),
"Primary beat.",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_interject": True, "reason": "jealous"}),
"Interjection beat!",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_close": False, "reason": "no signal"}),
]
mock = _override_llm(canned)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "tell me"}
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
assert mock._canned == []
with open_db(tmp_path / "test.db") as conn:
cur = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'assistant_turn'"
)
assistant_count = cur.fetchone()[0]
cur = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'"
)
memory_count = cur.fetchone()[0]
# All edge_updates after the FIRST assistant_turn are post-turn.
first_at = conn.execute(
"SELECT MIN(id) FROM event_log WHERE kind = 'assistant_turn'"
).fetchone()[0]
post_turn_edges = conn.execute(
"SELECT COUNT(*) FROM event_log "
"WHERE kind = 'edge_update' AND id > ?",
(first_at,),
).fetchone()[0]
# Both assistant_turn payloads should reference the same user_turn
# and the second one tags ``interjection_of`` the first speaker.
rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'assistant_turn' ORDER BY id"
).fetchall()
first_payload = json.loads(rows[0][0])
second_payload = json.loads(rows[1][0])
assert assistant_count == 2
assert memory_count == 4
assert post_turn_edges == 12
assert first_payload["text"] == "Primary beat."
assert second_payload["text"] == "Interjection beat!"
# The silent witness is the bot that wasn't the primary addressee.
assert second_payload["interjection_of"] == first_payload["speaker_id"]
assert second_payload["speaker_id"] != first_payload["speaker_id"]
assert first_payload["user_turn_id"] == second_payload["user_turn_id"]
def test_multi_bot_turn_scene_close_writes_per_pov_summaries(
app_state_setup, tmp_path
):
"""Chat has a guest, prose hard-signals a scene close, classifier
confirms. Verify a ``scene_closed`` event lands and per-POV summary
rewrites fire for both bots (memory.pov_summary changes for each).
Interjection short-circuits at False so the queue stays compact.
Canned queue (13 calls):
1. parse_turn
2. detect_addressee (T74.1) -> host
3. narrative stream (primary)
4-9. 6 state-update calls
10. detect_interjection -> False (no follow-on stream)
11. detect_scene_close -> True
12. apply_scene_close_summary host POV
13. apply_scene_close_summary guest POV
"""
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
{
"segments": [
{"kind": "narration", "text": "we are done here, fade out"}
]
}
)
pov_payload = json.dumps(
{
"summary": "BotA noticed the day winding down.",
"knowledge_facts": [],
"relationship_summary": "warmer",
}
)
pov_payload_guest = json.dumps(
{
"summary": "BotB watched the scene close.",
"knowledge_facts": [],
"relationship_summary": "warmer",
}
)
canned = [
canned_parse,
json.dumps(
{"addressee_id": "bot_a", "confidence": "medium", "reason": "host"}
),
"Goodnight.",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_interject": False, "reason": "calm"}),
json.dumps({"should_close": True, "reason": "fade out signaled"}),
pov_payload,
pov_payload_guest,
]
mock = _override_llm(canned)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "we are done here, fade out"}
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
assert mock._canned == []
with open_db(tmp_path / "test.db") as conn:
cur = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'scene_closed'"
)
scene_close_count = cur.fetchone()[0]
# One memory_pov_summary manual_edit per witness.
cur = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'manual_edit'"
)
manual_edits = [json.loads(r[0]) for r in cur.fetchall()]
pov_edits = [
e for e in manual_edits
if e.get("target_kind") == "memory_pov_summary"
]
# After the rewrite, bot_a's scene-1 memory carries the host POV
# and bot_b's scene-1 memory carries the guest POV.
host_pov = conn.execute(
"SELECT pov_summary FROM memories WHERE owner_id = ? AND scene_id = 1",
("bot_a",),
).fetchone()
guest_pov = conn.execute(
"SELECT pov_summary FROM memories WHERE owner_id = ? AND scene_id = 1",
("bot_b",),
).fetchone()
assert scene_close_count == 1
# Two memory rewrites — one per witness.
assert len(pov_edits) == 2
assert host_pov is not None and "BotA noticed" in host_pov[0]
assert guest_pov is not None and "BotB watched" in guest_pov[0]
def test_addressee_detection_routes_to_named_bot(app_state_setup, tmp_path):
"""T74.1: the multi-entity addressee call goes through the classifier;
when the classifier returns the guest, the primary turn routes there.
Interjection (when fired) makes the host the silent witness and the
second assistant_turn carries the host as speaker.
Canned queue (with classifier-led addressee = guest):
1. parse_turn
2. detect_addressee -> bot_b (the guest)
3. narrative stream (primary, addressee = guest)
4-9. 6 state-update calls
10. detect_interjection -> True
11. interjection narrative stream
12-17. 6 state-update calls (post-interjection)
18. detect_scene_close -> False
"""
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "BotB, what do you think?"}]}
)
canned = [
canned_parse,
json.dumps(
{
"addressee_id": "bot_b",
"confidence": "high",
"reason": "user named BotB",
}
),
"BotB pondering.",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_interject": True, "reason": "host wants in"}),
"BotA chiming in.",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_close": False, "reason": "no signal"}),
]
mock = _override_llm(canned)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns",
data={"prose": "BotB, what do you think?"},
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
assert mock._canned == []
with open_db(tmp_path / "test.db") as conn:
rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'assistant_turn' ORDER BY id"
).fetchall()
primary_payload = json.loads(rows[0][0])
interjection_payload = json.loads(rows[1][0])
# Primary speaker is the guest because the addressee classifier
# picked bot_b for the prose ("BotB, what do you think?").
assert primary_payload["speaker_id"] == "bot_b"
# Interjection follow-on goes to the silent witness — the host.
assert interjection_payload["speaker_id"] == "bot_a"
assert interjection_payload["interjection_of"] == "bot_b"
def test_cancelled_turn_still_closes_scene_when_user_prose_signals_close(
app_state_setup, tmp_path
):
"""T74.3 regression: a cancelled primary stream still triggers scene
close when the user prose carries a hard close signal.
Rationale (also documented in turns.py near the close-detection
branch): close detection only consumes the user's prose, which is
fully appended to the event_log BEFORE streaming starts. The
cancelled bot beat doesn't invalidate the user's intent to close.
Implementation: install a MockLLMClient whose ``stream`` raises
CancelledError on the first iteration. The classifier calls (parse,
addressee, scene_close, per-POV summaries) are still served from
the canned queue. The post_turn route ultimately re-raises
CancelledError after recording the partial — TestClient surfaces
that as an exception, so we drive the request inside ``with
pytest.raises``. Despite the exception, the scene_closed event
must land in the event_log.
T108 NOTE — this test does NOT actually exercise the cancel path.
``_CancelOnStreamMock.stream`` writes ``raise asyncio.CancelledError``
but ``asyncio`` is not imported at module scope, so the first
iteration raises ``NameError`` (caught by ``except Exception:`` in
post_turn, which sets ``primary_truncated=True`` but leaves
``cancelled=False``). The function therefore returns 204 normally,
the dependency-managed connection commits, and ``scene_closed``
lands. Importing asyncio so the real CancelledError fires reveals
a transactional bug: ``post_turn``'s end-of-function re-raise
causes ``open_db``'s dependency teardown to skip ``conn.commit()``,
rolling back ALL post-cancel writes (user_turn, assistant_turn,
edge_updates, scene_closed). Deferred for triage — see T108 report.
"""
from typing import AsyncIterator, Sequence
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "narration", "text": "we are done here, fade out"}]}
)
pov_payload = json.dumps(
{
"summary": "BotA noticed the day winding down.",
"knowledge_facts": [],
"relationship_summary": "warmer",
}
)
pov_payload_guest = json.dumps(
{
"summary": "BotB watched the scene close.",
"knowledge_facts": [],
"relationship_summary": "warmer",
}
)
# Canned queue: parse + addressee + 6 state-updates +
# scene_close=True + 2 per-POV summaries. NO interjection slot
# because the cancel path short-circuits the interjection branch.
canned = [
canned_parse,
json.dumps(
{"addressee_id": "bot_a", "confidence": "medium", "reason": "host"}
),
# NOTE: no narrative slot — the stream is hijacked below to
# raise CancelledError on first iteration; it never pulls a
# canned response.
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_close": True, "reason": "fade out signaled"}),
pov_payload,
pov_payload_guest,
]
class _CancelOnStreamMock:
"""Mock LLM client that serves ``generate`` from a canned queue
and raises CancelledError on the FIRST iteration of ``stream``.
Mirrors :class:`chat.llm.mock.MockLLMClient` for ``generate`` but
diverges on ``stream`` to simulate a mid-stream cancel.
"""
def __init__(self, canned: list[str]) -> None:
self._canned = list(canned)
async def generate(
self, messages: Sequence, *, model: str, **params
) -> str:
return self._canned.pop(0)
async def stream(
self, messages: Sequence, *, model: str, **params
) -> AsyncIterator[str]:
# Yield a CancelledError on first iteration to simulate the
# /turns/cancel route firing mid-stream.
raise asyncio.CancelledError
yield # pragma: no cover — keeps this an async generator.
from chat.web.kickoff import get_llm_client
mock = _CancelOnStreamMock(canned=list(canned))
app.dependency_overrides[get_llm_client] = lambda: mock
try:
# FastAPI/Starlette handles the re-raised CancelledError as an
# internal failure — TestClient surfaces it as a 500 response.
# We don't assert on the status here; the regression is whether
# the scene_closed event still landed in the event_log.
try:
app_state_setup.post(
"/chats/chat_bot_a/turns",
data={"prose": "we are done here, fade out"},
)
except BaseException:
# Some Starlette/asyncio versions propagate the
# CancelledError out of the test client; that's fine — the
# partial-record + scene-close still ran before the raise.
pass
finally:
app.dependency_overrides.clear()
with open_db(tmp_path / "test.db") as conn:
scene_close_count = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'scene_closed'"
).fetchone()[0]
assistant_payload = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'assistant_turn' ORDER BY id"
).fetchall()
# T108: pin the ordering — user_turn must commit before
# scene_closed (close detection runs on prose that is already
# in the event_log) and any assistant_turn the cancel produced
# must come last (truncated record written after both).
ordered = conn.execute(
"SELECT id, kind FROM event_log "
"WHERE kind IN ('user_turn', 'scene_closed', 'assistant_turn') "
"ORDER BY id"
).fetchall()
# Scene close lands despite the cancel.
assert scene_close_count == 1
# The cancelled assistant_turn was still recorded (truncated=True).
assert len(assistant_payload) == 1
assert json.loads(assistant_payload[0][0])["truncated"] is True
# T108 ordering pin: user_turn lands first, the truncated
# assistant_turn (if any) is committed BEFORE the scene_close
# decision fires, and scene_closed lands last. Close detection
# relies on user prose being committed to the event_log BEFORE
# the close decision runs — and the cancelled assistant beat is
# recorded as a partial before close-detection too.
kinds_in_order = [row[1] for row in ordered]
user_idx = kinds_in_order.index("user_turn")
close_idx = kinds_in_order.index("scene_closed")
assert user_idx < close_idx
if "assistant_turn" in kinds_in_order:
assert user_idx < kinds_in_order.index("assistant_turn") < close_idx
def test_interjection_enqueues_significance_job(app_state_setup, tmp_path):
"""T74.2: when an interjection fires, the interjection memory is
enqueued for significance scoring just like the primary memory.
Capture enqueued ``SignificanceJob``s by replacing the background
worker's ``enqueue`` method with a list-append. Without T74.2, the
interjection memory would never be scored — only the primary's
enqueue would land. We therefore expect TWO jobs after a turn that
has both a primary and an interjection beat: one for the primary
memory, one for the interjection memory.
"""
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "tell me"}]}
)
canned = [
canned_parse,
json.dumps(
{"addressee_id": "bot_a", "confidence": "medium", "reason": "host"}
),
"Primary beat.",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_interject": True, "reason": "jealous"}),
"Interjection beat!",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_close": False, "reason": "no signal"}),
]
_override_llm(canned)
captured_jobs: list = []
worker = app.state.background_worker
# Re-enable enqueue capture even though the worker's loop is disabled
# — we want to count enqueues without the loop running classifier work.
worker.enabled = True
original_enqueue = worker.enqueue
worker.enqueue = captured_jobs.append # type: ignore[assignment]
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "tell me"}
)
assert response.status_code == 204
finally:
worker.enqueue = original_enqueue # type: ignore[assignment]
worker.enabled = False
app.dependency_overrides.clear()
# Expect 2 enqueues: 1 for the primary memory + 1 for the
# interjection memory.
assert len(captured_jobs) == 2
# Both jobs should reference distinct memory ids — the primary's
# host-POV memory and the interjection's host-POV memory.
memory_ids = [job.memory_id for job in captured_jobs]
assert len(set(memory_ids)) == 2
# The two narrative texts should be the two streamed beats.
narrative_texts = sorted(job.narrative_text for job in captured_jobs)
assert narrative_texts == ["Interjection beat!", "Primary beat."]
# ---------------------------------------------------------------------------
# Phase 3 (T61) — per-turn event-lifecycle detection + completion promotion.
#
# After the post-turn classifier passes (memory write, state update,
# interjection check) and BEFORE scene-close detection, ``post_turn``
# calls :func:`detect_event_transitions`. Each transition becomes one
# of ``event_started`` / ``event_completed`` / ``event_cancelled``. A
# completed event is followed inline by ``promote_completed_event`` so
# the props it carries (knowledge_facts, etc.) land in state
# synchronously.
#
# When no active events are seeded the classifier short-circuits without
# an LLM call (per T52) — the canned queue therefore needs ZERO extra
# slots in that case.
# ---------------------------------------------------------------------------
def test_turn_with_event_transition_appends_started_event(
app_state_setup, tmp_path
):
"""A planned event becomes active when the classifier reports a
``new_status='active'`` transition for that event_id.
Canned queue (5 calls — single-bot, no scene seeded):
1. parse_turn
2. narrative stream
3. state-update bot_a -> you
4. state-update you -> bot_a
5. detect_event_transitions -> 1 transition (active)
"""
_seed(tmp_path / "test.db")
# Seed a planned event so list_active_events returns 1 row. Use
# append_and_apply so we don't re-replay the prior chat_created event
# (whose handler is INSERT-not-IGNORE and would 409 on replay).
with open_db(tmp_path / "test.db") as conn:
append_and_apply(
conn,
kind="event_planned",
payload={
"event_id": "evt_1",
"chat_id": "chat_bot_a",
"kind": "story_event",
"props": {},
"planned_for": "2026-04-30T18:00:00+00:00",
},
)
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "they arrived"}]}
)
canned_event_decision = json.dumps(
{
"transitions": [
{
"event_id": "evt_1",
"new_status": "active",
"reason": "they arrived",
}
]
}
)
mock = _override_llm(
[
canned_parse,
"They walk in.",
_zero_state(),
_zero_state(),
canned_event_decision,
]
)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "they arrived"}
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
# All 5 canned slots consumed.
assert mock._canned == []
with open_db(tmp_path / "test.db") as conn:
# event_started landed in event_log.
rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'event_started' ORDER BY id"
).fetchall()
assert len(rows) == 1
started_payload = json.loads(rows[0][0])
assert started_payload["event_id"] == "evt_1"
assert started_payload["started_at"] == "2026-04-26T20:00:00+00:00"
# The events projection row reflects the active status.
ev_row = conn.execute(
"SELECT status, started_at FROM events WHERE event_id = ?",
("evt_1",),
).fetchone()
assert ev_row is not None
assert ev_row[0] == "active"
assert ev_row[1] == "2026-04-26T20:00:00+00:00"
def test_turn_with_event_completion_runs_promotion(app_state_setup, tmp_path):
"""An active event with knowledge_facts in props completes; the
inline call to ``promote_completed_event`` emits the corresponding
``edge_update``.
"""
_seed(tmp_path / "test.db")
# Seed: planned -> started so the event is currently active. Props
# carry a knowledge_fact that promotion will turn into an edge_update.
# Use append_and_apply (not project) to avoid re-replaying chat_created.
with open_db(tmp_path / "test.db") as conn:
append_and_apply(
conn,
kind="event_planned",
payload={
"event_id": "evt_2",
"chat_id": "chat_bot_a",
"kind": "story_event",
"props": {
"knowledge_facts": [
{
"owner_id": "bot_a",
"target_id": "you",
"fact": "Maya likes pottery",
}
]
},
"planned_for": "2026-04-30T18:00:00+00:00",
},
)
append_and_apply(
conn,
kind="event_started",
payload={
"event_id": "evt_2",
"started_at": "2026-04-30T19:00:00+00:00",
},
)
# Snapshot the max event_log id so we can assert on rows AFTER the turn.
with open_db(tmp_path / "test.db") as conn:
before_id = conn.execute(
"SELECT COALESCE(MAX(id), 0) FROM event_log"
).fetchone()[0]
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "we wrap it up"}]}
)
canned_event_decision = json.dumps(
{
"transitions": [
{
"event_id": "evt_2",
"new_status": "completed",
"reason": "wrapped",
}
]
}
)
mock = _override_llm(
[
canned_parse,
"They wrap it up.",
_zero_state(),
_zero_state(),
canned_event_decision,
]
)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "we wrap it up"}
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
assert mock._canned == []
with open_db(tmp_path / "test.db") as conn:
# event_completed landed.
completed_rows = conn.execute(
"SELECT id, payload_json FROM event_log "
"WHERE kind = 'event_completed' AND id > ? ORDER BY id",
(before_id,),
).fetchall()
assert len(completed_rows) == 1
completed_payload = json.loads(completed_rows[0][1])
assert completed_payload["event_id"] == "evt_2"
completed_id = completed_rows[0][0]
# promote_completed_event ran inline AFTER event_completed: the
# follow-on edge_update carries the knowledge fact and is tagged
# with source=event_promotion.
promo_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'edge_update' AND id > ? ORDER BY id",
(completed_id,),
).fetchall()
promo_facts: list[str] = []
for (payload_json,) in promo_rows:
p = json.loads(payload_json)
if p.get("source") == "event_promotion":
promo_facts.extend(p.get("knowledge_facts") or [])
assert "Maya likes pottery" in promo_facts
def test_turn_with_no_active_events_skips_classifier(app_state_setup, tmp_path):
"""When no active events are seeded, ``detect_event_transitions``
short-circuits without an LLM call (per T52). The canned queue must
therefore have ZERO event-detection slots — same shape as the
Phase 2 no-guest baseline.
"""
_seed(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "hello"}]}
)
# Only 4 slots: parse + narrative + 2 state-updates. NO extra slot for
# event-detection — non-existent active_events causes the helper to
# short-circuit before pulling from the queue.
mock = _override_llm(
[canned_parse, "Hi there.", _zero_state(), _zero_state()]
)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "hello"}
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
# Queue fully drained — no canned slot was consumed by event detection.
assert mock._canned == []
with open_db(tmp_path / "test.db") as conn:
for kind in ("event_started", "event_completed", "event_cancelled"):
count = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = ?", (kind,)
).fetchone()[0]
assert count == 0, f"expected zero {kind} events, got {count}"
# ---------------------------------------------------------------------------
# Phase 3 (T62) — natural-language skip-command surface.
#
# The classifier may flag prose as a time-skip directive via
# ``ParsedTurn.intent``. Elision runs through the shared controller in
# :mod:`chat.web.skip` and short-circuits the regular narrative path;
# jump returns 422 directing the user to the drawer's structured form
# (Phase 3 simpler path — natural-language jump time derivation is too
# fragile for v1 without the structured surface).
# ---------------------------------------------------------------------------
def test_elision_skip_via_natural_language(app_state_setup, tmp_path):
"""User prose 'skip to when we arrive at the park' classifies as
``intent='skip_elision'``. The post_turn handler short-circuits the
narrative path, advances the chat clock by an hour stub, appends a
``time_skip_elision`` event AND an ``assistant_turn`` carrying the
canned narration. No ``user_turn`` is emitted on the skip path.
Canned queue: 1 parse_turn (intent=skip_elision) + 1 narration
string (consumed by ``narrate_skip``). No state-update / scene-close
/ event-detection slots — those branches are bypassed entirely.
"""
_seed(tmp_path / "test.db")
canned_parse = json.dumps(
{
"segments": [
{"kind": "dialogue", "text": "skip to when we arrive at the park"}
],
"intent": "skip_elision",
"landing_state_hint": "we arrive at the park",
}
)
canned_narration = "We pull up to the park entrance, sun low in the sky."
mock = _override_llm([canned_parse, canned_narration])
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns",
data={"prose": "skip to when we arrive at the park"},
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
# Both canned slots drained — no other classifier branches ran.
assert mock._canned == []
with open_db(tmp_path / "test.db") as conn:
# time_skip_elision landed.
skip_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'time_skip_elision' ORDER BY id"
).fetchall()
assert len(skip_rows) == 1
sp = json.loads(skip_rows[0][0])
assert sp["chat_id"] == "chat_bot_a"
# 1-hour stub from the seeded chat clock (20:00 -> 21:00).
assert sp["new_time"].startswith("2026-04-26T21:00:00")
# Chat clock advanced via the projector.
from chat.state.world import get_chat
chat = get_chat(conn, "chat_bot_a")
assert chat["time"].startswith("2026-04-26T21:00:00")
# An assistant_turn carrying the canned narration was appended.
turn_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'assistant_turn' ORDER BY id"
).fetchall()
assert len(turn_rows) == 1
tp = json.loads(turn_rows[0][0])
assert tp["chat_id"] == "chat_bot_a"
assert tp["text"] == canned_narration
assert tp["speaker_id"] == "bot_a"
assert tp["truncated"] is False
# No user_turn lands on the skip path — the natural-language
# skip is a command, not a beat the bots should remember.
user_count = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'user_turn'"
).fetchone()[0]
assert user_count == 0
def test_jump_skip_via_natural_language_returns_422(app_state_setup, tmp_path):
"""User prose 'next morning' classifies as ``intent='skip_jump'``.
The handler returns 422 with a guidance payload pointing the author
at the drawer's structured jump form. No event is emitted — the
drawer form is the only entry point for jump skips in Phase 3.
"""
_seed(tmp_path / "test.db")
canned_parse = json.dumps(
{
"segments": [{"kind": "dialogue", "text": "next morning"}],
"intent": "skip_jump",
"landing_state_hint": "",
}
)
# Only one canned slot — parse — because the 422 fallback short-
# circuits before any other classifier runs.
mock = _override_llm([canned_parse])
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "next morning"}
)
assert response.status_code == 422
body = response.json()
# Guidance payload mentions the drawer so the client can surface
# the right CTA; we don't pin the exact wording.
assert "drawer" in body.get("error", "").lower()
finally:
app.dependency_overrides.clear()
# Parse slot consumed; no follow-on classifier calls.
assert mock._canned == []
with open_db(tmp_path / "test.db") as conn:
for kind in (
"user_turn",
"assistant_turn",
"time_skip_elision",
"time_skip_jump",
):
count = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = ?", (kind,)
).fetchone()[0]
assert count == 0, f"expected zero {kind} on jump-via-NL, got {count}"
def test_skip_command_does_not_run_narrative_classifier(
app_state_setup, tmp_path, monkeypatch
):
"""The skip dispatch branch must bypass the narrative-prompt assembly
entirely. We monkeypatch ``assemble_narrative_prompt`` (re-bound on
the ``chat.web.turns`` module since the handler imports it by name)
and assert the call count is zero after the elision skip lands.
"""
_seed(tmp_path / "test.db")
canned_parse = json.dumps(
{
"segments": [
{"kind": "dialogue", "text": "skip to when we arrive at the park"}
],
"intent": "skip_elision",
"landing_state_hint": "we arrive at the park",
}
)
canned_narration = "We arrive moments later."
mock = _override_llm([canned_parse, canned_narration])
call_counter = {"n": 0}
def _spy(*args, **kwargs):
call_counter["n"] += 1
return []
# Patch the symbol at the handler's import site so we can assert
# the skip path bypasses prompt assembly even when the symbol still
# exists in the module namespace.
from chat.web import turns as turns_mod
monkeypatch.setattr(turns_mod, "assemble_narrative_prompt", _spy)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns",
data={"prose": "skip to when we arrive at the park"},
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
assert mock._canned == []
assert call_counter["n"] == 0, (
"assemble_narrative_prompt was called on the skip path; the "
"natural-language skip dispatch must bypass narrative assembly."
)
# ---------------------------------------------------------------------------
# Phase 3.5 (T82.1) — post_turn consumes pending meanwhile digests.
#
# The helper ``consume_pending_meanwhile_digests`` lives in
# chat.services.prompt and is now wired into the END of post_turn (after
# scene-close detection, before the response broadcast). This pins the
# wiring so future refactors don't accidentally drop the call and leave
# digests pending forever.
# ---------------------------------------------------------------------------
def test_post_turn_consumes_pending_meanwhile_digests(
app_state_setup, tmp_path
):
"""Seed a pending meanwhile digest via ``meanwhile_digest_created``,
POST a regular you-turn through post_turn, and assert:
1. A ``meanwhile_digest_consumed`` event lands in the event_log.
2. ``list_pending_meanwhile_digests`` returns empty after the turn.
The post_turn flow surfaces the digest in the prompt (T65) and then
consumes it (T82.1) so the next turn starts clean.
"""
_seed(tmp_path / "test.db")
db_path = tmp_path / "test.db"
# Seed a pending digest directly via the projection event. The scene_id
# field doesn't need to reference an existing meanwhile scene for the
# digest table — the FK is on the digest payload only.
with open_db(db_path) as conn:
append_and_apply(
conn,
kind="meanwhile_digest_created",
payload={
"scene_id": 99,
"chat_id": "chat_bot_a",
"summary": "While you were away, the bots talked.",
},
)
# Confirm the digest is pending before the turn lands.
from chat.state.meanwhile import list_pending_meanwhile_digests
assert len(list_pending_meanwhile_digests(conn, "chat_bot_a")) == 1
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "hello"}]}
)
# Standard 4-slot queue: parse + narrative + 2 state-updates. No
# active scene so scene-close detection short-circuits without an LLM
# call (consistent with the no-guest regression test).
mock = _override_llm(
[canned_parse, "Hi there.", _zero_state(), _zero_state()]
)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "hello"}
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
# All canned slots drained — no extra classifier calls fired.
assert mock._canned == []
with open_db(db_path) as conn:
# A meanwhile_digest_consumed event landed for the seeded digest.
consumed_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'meanwhile_digest_consumed' ORDER BY id"
).fetchall()
assert len(consumed_rows) == 1
# The pending list is empty after consumption.
from chat.state.meanwhile import list_pending_meanwhile_digests
assert list_pending_meanwhile_digests(conn, "chat_bot_a") == []
# ---------------------------------------------------------------------------
# Phase 3.5 (T82.2) — natural-language skip runs scene close detection.
#
# A user typing "fade out, skip an hour" should close the scene FIRST
# (so the close summary captures the closing scene's final beat) and
# THEN run the elision skip. Without this wiring, the skip dispatch
# branch bypasses scene close entirely.
# ---------------------------------------------------------------------------
def test_natural_language_skip_with_close_signal_closes_scene(
app_state_setup, tmp_path
):
"""Prose that hard-signals a close ("fade out, skip to morning") and
parses as ``intent=skip_elision`` must:
1. Land a ``scene_closed`` event before any skip event.
2. Run ``apply_scene_close_summary`` for the closing scene.
3. Land a ``time_skip_elision`` event AFTER the scene_closed.
Order matters — the scene_closed id must be lower than the
time_skip_elision id in the event_log.
Canned queue (single-bot, scene seeded, NO prior dialogue rows):
1. parse_turn -> intent=skip_elision
2. detect_scene_close -> should_close=True
3. apply_scene_close_summary host POV
4. narrate_skip narration
detect_threads (T58.2 fires on every close) short-circuits when the
scene-scoped transcript is empty — in this test no user/assistant
turns landed in scene 1 before the close, so no thread-detection
slot is needed.
"""
# Seed an open scene so detect_scene_close has something to act on.
db_path = tmp_path / "test.db"
with open_db(db_path) as conn:
append_event(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "thoughtful, observant",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="container_created",
payload={
"chat_id": "chat_bot_a",
"name": "office",
"type": "workplace",
"properties": {},
},
)
append_event(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T20:00:00+00:00",
"participants": ["you", "bot_a"],
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
"knowledge_facts": ["coworker"],
},
)
for entity_id, verb in [("you", "talking"), ("bot_a", "listening")]:
append_event(
conn,
kind="activity_change",
payload={
"entity_id": entity_id,
"posture": "sitting",
"action": {
"verb": verb,
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
project(conn)
canned_parse = json.dumps(
{
"segments": [
{"kind": "narration", "text": "fade out, skip to morning"}
],
"intent": "skip_elision",
"landing_state_hint": "morning at home",
}
)
canned_close = json.dumps(
{"should_close": True, "reason": "fade out signaled"}
)
canned_pov = json.dumps(
{
"summary": "BotA noticed the day winding down.",
"knowledge_facts": [],
"relationship_summary": "warmer",
}
)
canned_narration = "The night fades and morning arrives."
mock = _override_llm(
[
canned_parse,
canned_close,
canned_pov,
canned_narration,
]
)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns",
data={"prose": "fade out, skip to morning"},
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
# All 4 canned slots drained — close + skip both ran end-to-end.
assert mock._canned == []
with open_db(db_path) as conn:
# scene_closed and time_skip_elision both landed.
scene_close_rows = conn.execute(
"SELECT id FROM event_log WHERE kind = 'scene_closed'"
).fetchall()
skip_rows = conn.execute(
"SELECT id FROM event_log WHERE kind = 'time_skip_elision'"
).fetchall()
assert len(scene_close_rows) == 1, "scene_closed must land"
assert len(skip_rows) == 1, "time_skip_elision must land"
# Order: scene close first, then skip.
assert scene_close_rows[0][0] < skip_rows[0][0], (
"scene_closed must precede time_skip_elision in the event_log"
)