fix: scope thread detection transcript to closing scene (T80.2)

apply_scene_close_summary fed detect_threads the chat-wide last-50
turns. When a chat has accumulated multiple scenes' worth of dialogue,
that bleeds prior-scene turns into the second close's classifier prompt
and risks mis-attributing threads (closing one that opened earlier,
re-opening one that already closed).

Add an optional ``since_event_id`` kwarg to ``_read_recent_dialogue``
that lower-bounds by event_log id, plus a ``_scene_opened_event_id``
helper that resolves the scene-open event for a given scene_id. Wire
both into the thread-detection call site so its scene_transcript
holds only the closing scene's turns. The per-POV summarizer keeps the
chat-wide approximation it had before — that's intentional.

Adds test_thread_detection_uses_scene_scoped_transcript.
This commit is contained in:
Joseph Doherty
2026-04-26 21:48:44 -04:00
parent d123684f9a
commit dae481eb92
2 changed files with 238 additions and 14 deletions
+103 -14
View File
@@ -123,7 +123,11 @@ async def summarize_scene(
def _read_recent_dialogue( def _read_recent_dialogue(
conn: Connection, chat_id: str, *, limit: int = 50 conn: Connection,
chat_id: str,
*,
limit: int = 50,
since_event_id: int | None = None,
) -> list[dict]: ) -> list[dict]:
"""Pull the last ``limit`` user/assistant turns for ``chat_id``. """Pull the last ``limit`` user/assistant turns for ``chat_id``.
@@ -132,14 +136,29 @@ def _read_recent_dialogue(
the most recent turns of the chat. Superseded and hidden rows are the most recent turns of the chat. Superseded and hidden rows are
filtered out so regenerated turns (T29) don't bleed into the filtered out so regenerated turns (T29) don't bleed into the
summary. summary.
T80.2: ``since_event_id`` clamps the result to event_log rows whose
``id >= since_event_id`` so callers needing a scene-scoped view (e.g.
thread detection on close) don't pull turns that landed before the
closing scene's ``scene_opened`` event.
""" """
cur = conn.execute( if since_event_id is None:
"SELECT kind, payload_json FROM event_log " cur = conn.execute(
"WHERE kind IN ('user_turn', 'assistant_turn') " "SELECT kind, payload_json FROM event_log "
" AND superseded_by IS NULL AND hidden = 0 " "WHERE kind IN ('user_turn', 'assistant_turn') "
"ORDER BY id DESC LIMIT ?", " AND superseded_by IS NULL AND hidden = 0 "
(limit,), "ORDER BY id DESC LIMIT ?",
) (limit,),
)
else:
cur = conn.execute(
"SELECT kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
" AND id >= ? "
"ORDER BY id DESC LIMIT ?",
(since_event_id, limit),
)
rows = list(reversed(cur.fetchall())) rows = list(reversed(cur.fetchall()))
out: list[dict] = [] out: list[dict] = []
for kind, payload_json in rows: for kind, payload_json in rows:
@@ -158,6 +177,65 @@ def _read_recent_dialogue(
return out return out
def _scene_opened_event_id(
conn: Connection, chat_id: str, scene_id: int
) -> int | None:
"""Return the event_log id of the ``scene_opened`` (or
``meanwhile_scene_started``) event that created scene row
``scene_id``. Used by T80.2 to lower-bound dialogue reads to a
single scene's transcript.
``meanwhile_scene_started`` carries an explicit ``scene_id`` so we
match on that directly. ``scene_opened`` doesn't, so we walk the
chat's scene rows in id order and zip against the chat's scene-open
events in id order — the projector creates one scene row per
scene-open event, so positions correspond.
Returns ``None`` when no matching event is found; callers should
treat that as "fall back to chat-wide" rather than over-filter.
"""
# Fast path for meanwhile children (explicit scene_id in payload).
for ev_id, payload_json in conn.execute(
"SELECT id, payload_json FROM event_log "
"WHERE kind = 'meanwhile_scene_started' "
" AND superseded_by IS NULL AND hidden = 0",
).fetchall():
try:
p = json.loads(payload_json)
except (TypeError, ValueError):
continue
if p.get("chat_id") == chat_id and p.get("scene_id") == scene_id:
return ev_id
# Fallback for parent you-scenes: zip chat-scoped scene-open events
# against chat-scoped scene rows in id order.
chat_scene_ids = [
r[0]
for r in conn.execute(
"SELECT id FROM scenes WHERE chat_id = ? ORDER BY id ASC",
(chat_id,),
).fetchall()
]
if scene_id not in chat_scene_ids:
return None
chat_open_evs: list[int] = []
for ev_id, _kind, payload_json in conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('scene_opened', 'meanwhile_scene_started') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id ASC",
).fetchall():
try:
p = json.loads(payload_json)
except (TypeError, ValueError):
continue
if p.get("chat_id") == chat_id:
chat_open_evs.append(ev_id)
idx = chat_scene_ids.index(scene_id)
if idx < len(chat_open_evs):
return chat_open_evs[idx]
return None
async def _summarize_and_apply_for_witness( async def _summarize_and_apply_for_witness(
conn: Connection, conn: Connection,
client: LLMClient, client: LLMClient,
@@ -487,16 +565,27 @@ async def apply_scene_close_summary(
}, },
) )
# T58.2: thread detection on close. Reuses the dialogue we already # T58.2: thread detection on close. Failure-tolerant: classify()
# gathered for per-POV summarization — same {speaker, text} shape # returns the empty default on retry-exhaustion, and the broad except
# detect_threads expects. Failure-tolerant: classify() returns the # below protects the close pipeline from any other classifier/mock
# empty default on retry-exhaustion, and the broad except below # flap.
# protects the close pipeline from any other classifier/mock flap. #
# T80.2: thread detection runs against a SCENE-SCOPED transcript,
# not the chat-wide last-50 turns used by the per-POV summaries.
# Mis-attributing threads when scene boundaries fall inside the last
# 50 turns would otherwise close threads opened in a prior scene.
scene_open_ev_id = _scene_opened_event_id(conn, chat_id, scene_id)
if scene_open_ev_id is not None:
scene_dialogue = _read_recent_dialogue(
conn, chat_id, since_event_id=scene_open_ev_id
)
else:
scene_dialogue = dialogue
try: try:
thread_result = await detect_threads( thread_result = await detect_threads(
client, client,
classifier_model=classifier_model, classifier_model=classifier_model,
scene_transcript=dialogue, scene_transcript=scene_dialogue,
open_threads=list_open_threads(conn, chat_id), open_threads=list_open_threads(conn, chat_id),
timeout_s=timeout_s, timeout_s=timeout_s,
) )
+135
View File
@@ -1490,3 +1490,138 @@ async def test_scene_close_re_run_does_not_double_suffix(tmp_path):
# from a row whose text already contained the suffix). # from a row whose text already contained the suffix).
inner_count = pov.count("Key quotes:") inner_count = pov.count("Key quotes:")
assert inner_count == 1 assert inner_count == 1
@pytest.mark.asyncio
async def test_thread_detection_uses_scene_scoped_transcript(
tmp_path, monkeypatch
):
"""T80.2: when a chat has multiple closed scenes, the second scene's
close must hand ``detect_threads`` ONLY the second scene's turns —
not the chat-wide last-50, which would bleed in the first scene's
transcript and risk mis-closing threads."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
captured_transcripts: list[list[dict]] = []
async def capturing_detect_threads(client, **kwargs):
captured_transcripts.append(list(kwargs["scene_transcript"]))
return td_mod.ThreadDetectionResult()
monkeypatch.setattr(td_mod, "detect_threads", capturing_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
# Seed scene 1 + 3 turns + close.
_seed_single_bot_scene(conn)
# Add two extra distinct turns inside scene 1 so the transcript
# has clearly-scene-1 markers we can assert on.
append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "SCENE_ONE_USER_TURN",
"segments": [],
},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "SCENE_ONE_BOT_TURN",
"truncated": False,
"user_turn_id": 2,
},
)
project(conn)
# Close scene 1.
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
# Open scene 2 with distinct dialogue. Use append_and_apply so
# the new events project incrementally without re-running the
# already-applied seed events.
from chat.eventlog.log import append_and_apply
append_and_apply(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T21:00:00+00:00",
"participants": ["you", "bot_a"],
},
)
append_and_apply(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"scene_id": 2,
"pov_summary": "Original (scene 2)",
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"significance": 1,
},
)
append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "SCENE_TWO_USER_TURN",
"segments": [],
},
)
append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "SCENE_TWO_BOT_TURN",
"truncated": False,
"user_turn_id": 3,
},
)
# Close scene 2.
client2 = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client2,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=2,
host_bot_id="bot_a",
)
# The second close's transcript holds only scene-2 markers.
assert len(captured_transcripts) == 2
scene_two_transcript = captured_transcripts[1]
joined = " ".join(t.get("text", "") for t in scene_two_transcript)
assert "SCENE_TWO" in joined
assert "SCENE_ONE" not in joined