merge: T82 turns.py wiring (consume meanwhile digests + skip runs scene close)

This commit is contained in:
Joseph Doherty
2026-04-26 22:07:46 -04:00
3 changed files with 310 additions and 10 deletions
+56 -1
View File
@@ -64,7 +64,10 @@ from chat.services.event_promotion import promote_completed_event
from chat.services.interjection import detect_interjection from chat.services.interjection import detect_interjection
from chat.services.memory_write import record_turn_memory_for_present from chat.services.memory_write import record_turn_memory_for_present
from chat.services.multi_state_update import compute_state_updates_for_present from chat.services.multi_state_update import compute_state_updates_for_present
from chat.services.prompt import assemble_narrative_prompt from chat.services.prompt import (
assemble_narrative_prompt,
consume_pending_meanwhile_digests,
)
from chat.services.rewind import compute_rewind_preview, execute_rewind from chat.services.rewind import compute_rewind_preview, execute_rewind
from chat.services.scene_close import detect_scene_close from chat.services.scene_close import detect_scene_close
from chat.services.scene_summarize import apply_scene_close_summary from chat.services.scene_summarize import apply_scene_close_summary
@@ -314,6 +317,49 @@ async def post_turn(
) )
if intent == "skip_elision": if intent == "skip_elision":
# T82.2: run scene-close detection on the user's prose BEFORE
# the skip controller fires. Prose like "fade out, skip an hour"
# carries both a close signal and a skip directive; we want the
# close summary to capture the closing scene's final beat (and
# promote per-POV memories) before the time advances. Order
# matters: scene close -> skip narration -> time advance.
#
# When there's no active scene (or the prose carries no close
# signal) ``detect_scene_close`` returns the safe
# ``should_close=False`` default and we drop straight to the
# skip controller — same behavior as today, no extra cost.
skip_scene = active_scene(conn, chat_id)
if skip_scene is not None:
container = None
if skip_scene.get("container_id") is not None:
container = get_container(conn, skip_scene["container_id"])
container_name = container["name"] if container else "unknown"
close_decision = await detect_scene_close(
client,
model=settings.classifier_model,
prose=prose,
current_container_name=container_name,
)
if close_decision.should_close:
append_and_apply(
conn,
kind="scene_closed",
payload={
"scene_id": skip_scene["id"],
"ended_at": chat.get("time"),
"significance": 0,
},
)
await apply_scene_close_summary(
conn,
client,
classifier_model=settings.classifier_model,
chat_id=chat_id,
scene_id=skip_scene["id"],
host_bot_id=host_bot["id"],
timeout_s=settings.classifier_timeout_s,
)
# Derive ``new_time`` from the chat clock. Phase 3 stub: bump by # Derive ``new_time`` from the chat clock. Phase 3 stub: bump by
# 1 hour. The drawer's elision form is the structured path when # 1 hour. The drawer's elision form is the structured path when
# the author wants a specific landing time; here the goal is # the author wants a specific landing time; here the goal is
@@ -886,6 +932,15 @@ async def post_turn(
timeout_s=settings.classifier_timeout_s, timeout_s=settings.classifier_timeout_s,
) )
# 9a. Consume any pending meanwhile digests now that the assistant_turn
# (which surfaced them in its prompt via T65's helper) has landed. The
# spec's "first you-turn AFTER meanwhile close consumes the digest"
# semantics are preserved by running this AFTER scene-close detection
# — anything pending right now belongs to the prompt we just answered,
# so it's safe to mark consumed and the NEXT turn starts clean.
# Idempotent: re-calling produces zero events when nothing's pending.
consume_pending_meanwhile_digests(conn, chat_id)
# 10. Broadcast a JSON completion event (for JS consumers) and an HTML # 10. Broadcast a JSON completion event (for JS consumers) and an HTML
# fragment event (for HTMX SSE swap-into-timeline). One pair per # fragment event (for HTMX SSE swap-into-timeline). One pair per
# written assistant_turn so the timeline ends up with both the # written assistant_turn so the timeline ends up with both the
+10 -9
View File
@@ -39,11 +39,11 @@ Cross-feature notes discovered while writing these tests:
swallowed. Tests that don't care about thread coverage can omit the swallowed. Tests that don't care about thread coverage can omit the
slot; test 2 includes a valid thread response to exercise the path. slot; test 2 includes a valid thread response to exercise the path.
- ``consume_pending_meanwhile_digests`` is defined in chat.services.prompt - ``consume_pending_meanwhile_digests`` is defined in chat.services.prompt
but is NOT currently wired into the post_turn flow. The digest stays and is wired into the END of post_turn (after scene-close detection)
pending across turns until the helper is called explicitly. Test 4 by T82.1. Test 4 still drives the helper directly because it asserts
reflects this: it asserts the digest renders pre-consumption AND the helper's contract in isolation (no post_turn round-trip in scope);
post-consumption (driven via the helper directly), and that the the explicit call doubles as defensive coverage and is idempotent — a
meanwhile_digest_consumed event lands in the event_log. second call on already-consumed digests is a no-op.
- The host-only ``apply_scene_close_summary`` canned queue layout is - The host-only ``apply_scene_close_summary`` canned queue layout is
``[host_pov, thread_detection]`` (2 slots) when a single bot is present ``[host_pov, thread_detection]`` (2 slots) when a single bot is present
and there are dialogue rows, with thread_detection being optional / and there are dialogue rows, with thread_detection being optional /
@@ -769,10 +769,11 @@ def test_meanwhile_close_digest_surfaces_then_consumed(
— the digest is gone, and a meanwhile_digest_consumed event landed. — the digest is gone, and a meanwhile_digest_consumed event landed.
Cross-feature finding: ``consume_pending_meanwhile_digests`` is Cross-feature finding: ``consume_pending_meanwhile_digests`` is
defined in chat.services.prompt but is NOT wired into the post_turn defined in chat.services.prompt and wired into post_turn by T82.1
flow. The digest stays pending across turns until callers invoke (after scene-close detection). This test exercises the helper
the helper. Test exercises the helper directly so the consumption directly so the consumption contract is pinned in isolation from
contract is pinned independent of any future post_turn integration. the post_turn round-trip; T82.1's wiring is covered by a dedicated
test in tests/test_turn_flow.py.
Canned queue for the meanwhile turn: Canned queue for the meanwhile turn:
1. parse_turn 1. parse_turn
+244
View File
@@ -1317,3 +1317,247 @@ def test_skip_command_does_not_run_narrative_classifier(
"assemble_narrative_prompt was called on the skip path; the " "assemble_narrative_prompt was called on the skip path; the "
"natural-language skip dispatch must bypass narrative assembly." "natural-language skip dispatch must bypass narrative assembly."
) )
# ---------------------------------------------------------------------------
# Phase 3.5 (T82.1) — post_turn consumes pending meanwhile digests.
#
# The helper ``consume_pending_meanwhile_digests`` lives in
# chat.services.prompt and is now wired into the END of post_turn (after
# scene-close detection, before the response broadcast). This pins the
# wiring so future refactors don't accidentally drop the call and leave
# digests pending forever.
# ---------------------------------------------------------------------------
def test_post_turn_consumes_pending_meanwhile_digests(
app_state_setup, tmp_path
):
"""Seed a pending meanwhile digest via ``meanwhile_digest_created``,
POST a regular you-turn through post_turn, and assert:
1. A ``meanwhile_digest_consumed`` event lands in the event_log.
2. ``list_pending_meanwhile_digests`` returns empty after the turn.
The post_turn flow surfaces the digest in the prompt (T65) and then
consumes it (T82.1) so the next turn starts clean.
"""
_seed(tmp_path / "test.db")
db_path = tmp_path / "test.db"
# Seed a pending digest directly via the projection event. The scene_id
# field doesn't need to reference an existing meanwhile scene for the
# digest table — the FK is on the digest payload only.
with open_db(db_path) as conn:
append_and_apply(
conn,
kind="meanwhile_digest_created",
payload={
"scene_id": 99,
"chat_id": "chat_bot_a",
"summary": "While you were away, the bots talked.",
},
)
# Confirm the digest is pending before the turn lands.
from chat.state.meanwhile import list_pending_meanwhile_digests
assert len(list_pending_meanwhile_digests(conn, "chat_bot_a")) == 1
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "hello"}]}
)
# Standard 4-slot queue: parse + narrative + 2 state-updates. No
# active scene so scene-close detection short-circuits without an LLM
# call (consistent with the no-guest regression test).
mock = _override_llm(
[canned_parse, "Hi there.", _zero_state(), _zero_state()]
)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "hello"}
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
# All canned slots drained — no extra classifier calls fired.
assert mock._canned == []
with open_db(db_path) as conn:
# A meanwhile_digest_consumed event landed for the seeded digest.
consumed_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'meanwhile_digest_consumed' ORDER BY id"
).fetchall()
assert len(consumed_rows) == 1
# The pending list is empty after consumption.
from chat.state.meanwhile import list_pending_meanwhile_digests
assert list_pending_meanwhile_digests(conn, "chat_bot_a") == []
# ---------------------------------------------------------------------------
# Phase 3.5 (T82.2) — natural-language skip runs scene close detection.
#
# A user typing "fade out, skip an hour" should close the scene FIRST
# (so the close summary captures the closing scene's final beat) and
# THEN run the elision skip. Without this wiring, the skip dispatch
# branch bypasses scene close entirely.
# ---------------------------------------------------------------------------
def test_natural_language_skip_with_close_signal_closes_scene(
app_state_setup, tmp_path
):
"""Prose that hard-signals a close ("fade out, skip to morning") and
parses as ``intent=skip_elision`` must:
1. Land a ``scene_closed`` event before any skip event.
2. Run ``apply_scene_close_summary`` for the closing scene.
3. Land a ``time_skip_elision`` event AFTER the scene_closed.
Order matters — the scene_closed id must be lower than the
time_skip_elision id in the event_log.
Canned queue (single-bot, scene seeded, NO prior dialogue rows):
1. parse_turn -> intent=skip_elision
2. detect_scene_close -> should_close=True
3. apply_scene_close_summary host POV
4. narrate_skip narration
detect_threads (T58.2 fires on every close) short-circuits when the
scene-scoped transcript is empty — in this test no user/assistant
turns landed in scene 1 before the close, so no thread-detection
slot is needed.
"""
# Seed an open scene so detect_scene_close has something to act on.
db_path = tmp_path / "test.db"
with open_db(db_path) as conn:
append_event(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "thoughtful, observant",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="container_created",
payload={
"chat_id": "chat_bot_a",
"name": "office",
"type": "workplace",
"properties": {},
},
)
append_event(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T20:00:00+00:00",
"participants": ["you", "bot_a"],
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
"knowledge_facts": ["coworker"],
},
)
for entity_id, verb in [("you", "talking"), ("bot_a", "listening")]:
append_event(
conn,
kind="activity_change",
payload={
"entity_id": entity_id,
"posture": "sitting",
"action": {
"verb": verb,
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
project(conn)
canned_parse = json.dumps(
{
"segments": [
{"kind": "narration", "text": "fade out, skip to morning"}
],
"intent": "skip_elision",
"landing_state_hint": "morning at home",
}
)
canned_close = json.dumps(
{"should_close": True, "reason": "fade out signaled"}
)
canned_pov = json.dumps(
{
"summary": "BotA noticed the day winding down.",
"knowledge_facts": [],
"relationship_summary": "warmer",
}
)
canned_narration = "The night fades and morning arrives."
mock = _override_llm(
[
canned_parse,
canned_close,
canned_pov,
canned_narration,
]
)
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns",
data={"prose": "fade out, skip to morning"},
)
assert response.status_code == 204
finally:
app.dependency_overrides.clear()
# All 4 canned slots drained — close + skip both ran end-to-end.
assert mock._canned == []
with open_db(db_path) as conn:
# scene_closed and time_skip_elision both landed.
scene_close_rows = conn.execute(
"SELECT id FROM event_log WHERE kind = 'scene_closed'"
).fetchall()
skip_rows = conn.execute(
"SELECT id FROM event_log WHERE kind = 'time_skip_elision'"
).fetchall()
assert len(scene_close_rows) == 1, "scene_closed must land"
assert len(skip_rows) == 1, "time_skip_elision must land"
# Order: scene close first, then skip.
assert scene_close_rows[0][0] < skip_rows[0][0], (
"scene_closed must precede time_skip_elision in the event_log"
)