merge: T80 scene_summarize.py polish bundle (T58 follow-ups)

This commit is contained in:
Joseph Doherty
2026-04-26 21:52:44 -04:00
2 changed files with 680 additions and 18 deletions
+155 -18
View File
@@ -29,6 +29,7 @@ keeps moving.
from __future__ import annotations from __future__ import annotations
import json import json
import logging
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from sqlite3 import Connection from sqlite3 import Connection
@@ -39,6 +40,8 @@ from chat.eventlog.log import append_and_apply
from chat.llm.classify import classify from chat.llm.classify import classify
from chat.llm.client import LLMClient from chat.llm.client import LLMClient
_log = logging.getLogger(__name__)
class ScenePOVSummary(BaseModel): class ScenePOVSummary(BaseModel):
"""Classifier output: one witness's view of a closing scene. """Classifier output: one witness's view of a closing scene.
@@ -123,7 +126,11 @@ async def summarize_scene(
def _read_recent_dialogue( def _read_recent_dialogue(
conn: Connection, chat_id: str, *, limit: int = 50 conn: Connection,
chat_id: str,
*,
limit: int = 50,
since_event_id: int | None = None,
) -> list[dict]: ) -> list[dict]:
"""Pull the last ``limit`` user/assistant turns for ``chat_id``. """Pull the last ``limit`` user/assistant turns for ``chat_id``.
@@ -132,14 +139,29 @@ def _read_recent_dialogue(
the most recent turns of the chat. Superseded and hidden rows are the most recent turns of the chat. Superseded and hidden rows are
filtered out so regenerated turns (T29) don't bleed into the filtered out so regenerated turns (T29) don't bleed into the
summary. summary.
T80.2: ``since_event_id`` clamps the result to event_log rows whose
``id >= since_event_id`` so callers needing a scene-scoped view (e.g.
thread detection on close) don't pull turns that landed before the
closing scene's ``scene_opened`` event.
""" """
cur = conn.execute( if since_event_id is None:
"SELECT kind, payload_json FROM event_log " cur = conn.execute(
"WHERE kind IN ('user_turn', 'assistant_turn') " "SELECT kind, payload_json FROM event_log "
" AND superseded_by IS NULL AND hidden = 0 " "WHERE kind IN ('user_turn', 'assistant_turn') "
"ORDER BY id DESC LIMIT ?", " AND superseded_by IS NULL AND hidden = 0 "
(limit,), "ORDER BY id DESC LIMIT ?",
) (limit,),
)
else:
cur = conn.execute(
"SELECT kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
" AND id >= ? "
"ORDER BY id DESC LIMIT ?",
(since_event_id, limit),
)
rows = list(reversed(cur.fetchall())) rows = list(reversed(cur.fetchall()))
out: list[dict] = [] out: list[dict] = []
for kind, payload_json in rows: for kind, payload_json in rows:
@@ -158,6 +180,65 @@ def _read_recent_dialogue(
return out return out
def _scene_opened_event_id(
conn: Connection, chat_id: str, scene_id: int
) -> int | None:
"""Return the event_log id of the ``scene_opened`` (or
``meanwhile_scene_started``) event that created scene row
``scene_id``. Used by T80.2 to lower-bound dialogue reads to a
single scene's transcript.
``meanwhile_scene_started`` carries an explicit ``scene_id`` so we
match on that directly. ``scene_opened`` doesn't, so we walk the
chat's scene rows in id order and zip against the chat's scene-open
events in id order — the projector creates one scene row per
scene-open event, so positions correspond.
Returns ``None`` when no matching event is found; callers should
treat that as "fall back to chat-wide" rather than over-filter.
"""
# Fast path for meanwhile children (explicit scene_id in payload).
for ev_id, payload_json in conn.execute(
"SELECT id, payload_json FROM event_log "
"WHERE kind = 'meanwhile_scene_started' "
" AND superseded_by IS NULL AND hidden = 0",
).fetchall():
try:
p = json.loads(payload_json)
except (TypeError, ValueError):
continue
if p.get("chat_id") == chat_id and p.get("scene_id") == scene_id:
return ev_id
# Fallback for parent you-scenes: zip chat-scoped scene-open events
# against chat-scoped scene rows in id order.
chat_scene_ids = [
r[0]
for r in conn.execute(
"SELECT id FROM scenes WHERE chat_id = ? ORDER BY id ASC",
(chat_id,),
).fetchall()
]
if scene_id not in chat_scene_ids:
return None
chat_open_evs: list[int] = []
for ev_id, _kind, payload_json in conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('scene_opened', 'meanwhile_scene_started') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id ASC",
).fetchall():
try:
p = json.loads(payload_json)
except (TypeError, ValueError):
continue
if p.get("chat_id") == chat_id:
chat_open_evs.append(ev_id)
idx = chat_scene_ids.index(scene_id)
if idx < len(chat_open_evs):
return chat_open_evs[idx]
return None
async def _summarize_and_apply_for_witness( async def _summarize_and_apply_for_witness(
conn: Connection, conn: Connection,
client: LLMClient, client: LLMClient,
@@ -213,7 +294,11 @@ async def _summarize_and_apply_for_witness(
# Empty default -> skip the memory rewrite; the seeded # Empty default -> skip the memory rewrite; the seeded
# per-turn pov_summary stays in place. # per-turn pov_summary stays in place.
continue continue
new_value = pov.summary + key_quotes_suffix # T80.1: a prior close may have already appended a Key quotes
# suffix to this row's pov_summary. Strip it here so the fresh
# rewrite replaces the existing suffix rather than stacking a
# second one on top.
new_value = _strip_key_quotes_suffix(pov.summary) + key_quotes_suffix
append_and_apply( append_and_apply(
conn, conn,
kind="manual_edit", kind="manual_edit",
@@ -263,6 +348,31 @@ async def _summarize_and_apply_for_witness(
return pov return pov
# T80.1: header marker shared by the suffix builder and the
# witness-write strip step. Any text starting with this marker is treated
# as a previously-appended Key quotes suffix and stripped before reuse so
# repeated scene closes don't compose recursive bloat.
_KEY_QUOTES_HEADER = "\n\nKey quotes:\n"
def _strip_key_quotes_suffix(text: str) -> str:
"""Remove a previously-appended Key quotes suffix from ``text``.
Returns ``text`` unchanged when the marker is absent, or the prefix
up to (but not including) the marker when present. Used in two
places: (1) when sourcing quote text from a memory row that may
already carry the suffix from a prior close, and (2) when computing
the per-POV rewrite's prior_value so the new write replaces — rather
than stacks on — the old suffix.
"""
if not text:
return text
idx = text.find(_KEY_QUOTES_HEADER)
if idx >= 0:
return text[:idx]
return text
def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str: def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str:
"""If the scene's max-turn-significance is >= 2, build the """If the scene's max-turn-significance is >= 2, build the
"Key quotes:" suffix from the top-3 highest-significance memory rows "Key quotes:" suffix from the top-3 highest-significance memory rows
@@ -274,6 +384,10 @@ def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str:
per-turn narrative seeded by T21, since this helper is called BEFORE per-turn narrative seeded by T21, since this helper is called BEFORE
the per-POV rewrite. Texts are truncated to 200 chars to bound the per-POV rewrite. Texts are truncated to 200 chars to bound
memory row growth across many witnesses. memory row growth across many witnesses.
T80.1: candidate text is run through :func:`_strip_key_quotes_suffix`
first so a re-close (whose source memories already carry a suffix from
the prior close) doesn't quote a quote.
""" """
row = conn.execute( row = conn.execute(
"SELECT MAX(significance) FROM memories WHERE scene_id = ?", "SELECT MAX(significance) FROM memories WHERE scene_id = ?",
@@ -288,7 +402,7 @@ def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str:
(scene_id,), (scene_id,),
) )
quotes = [ quotes = [
(r[0] or "")[:200] _strip_key_quotes_suffix(r[0] or "")[:200]
for r in cur.fetchall() for r in cur.fetchall()
] ]
if not quotes: if not quotes:
@@ -454,20 +568,35 @@ async def apply_scene_close_summary(
}, },
) )
# T58.2: thread detection on close. Reuses the dialogue we already # T58.2: thread detection on close. Failure-tolerant: classify()
# gathered for per-POV summarization — same {speaker, text} shape # returns the empty default on retry-exhaustion, and the broad except
# detect_threads expects. Failure-tolerant: classify() returns the # below protects the close pipeline from any other classifier/mock
# empty default on retry-exhaustion, and the broad except below # flap.
# protects the close pipeline from any other classifier/mock flap. #
# T80.2: thread detection runs against a SCENE-SCOPED transcript,
# not the chat-wide last-50 turns used by the per-POV summaries.
# Mis-attributing threads when scene boundaries fall inside the last
# 50 turns would otherwise close threads opened in a prior scene.
scene_open_ev_id = _scene_opened_event_id(conn, chat_id, scene_id)
if scene_open_ev_id is not None:
scene_dialogue = _read_recent_dialogue(
conn, chat_id, since_event_id=scene_open_ev_id
)
else:
scene_dialogue = dialogue
try: try:
thread_result = await detect_threads( thread_result = await detect_threads(
client, client,
classifier_model=classifier_model, classifier_model=classifier_model,
scene_transcript=dialogue, scene_transcript=scene_dialogue,
open_threads=list_open_threads(conn, chat_id), open_threads=list_open_threads(conn, chat_id),
timeout_s=timeout_s, timeout_s=timeout_s,
) )
except Exception: except Exception as exc:
# T80.3: log the swallowed exception at DEBUG so a
# programmer-error flap (e.g. wrong kwarg name) surfaces in
# local logs without breaking the close pipeline.
_log.debug("detect_threads failed: %s", exc, exc_info=True)
from chat.services.thread_detection import ThreadDetectionResult from chat.services.thread_detection import ThreadDetectionResult
thread_result = ThreadDetectionResult() thread_result = ThreadDetectionResult()
@@ -495,12 +624,20 @@ async def apply_scene_close_summary(
}, },
) )
elif cand.action == "close" and cand.existing_thread_id: elif cand.action == "close" and cand.existing_thread_id:
# T80.4: chat-clock time, not wall clock — the rest of the
# close pipeline (memories, edges, scene_closed payloads)
# uses chat["time"] so threads must agree. Falls back to
# UTC now only when the chat row has no clock yet (defensive
# — chat_state always seeds "time" via chat_created).
chat_clock_at = chat.get("time") or datetime.now(
timezone.utc
).isoformat()
append_and_apply( append_and_apply(
conn, conn,
kind="thread_closed", kind="thread_closed",
payload={ payload={
"thread_id": cand.existing_thread_id, "thread_id": cand.existing_thread_id,
"closed_at": datetime.now(timezone.utc).isoformat(), "closed_at": chat_clock_at,
}, },
) )
+525
View File
@@ -1418,3 +1418,528 @@ def test_consumed_digest_does_not_render_again(tmp_path):
body2 = msgs2[0].content body2 = msgs2[0].content
assert "Meanwhile while you were away:" not in body2 assert "Meanwhile while you were away:" not in body2
assert digest_text not in body2 assert digest_text not in body2
# ---------------------------------------------------------------------------
# T80: scene_summarize polish bundle.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_scene_close_re_run_does_not_double_suffix(tmp_path):
"""T80.1: re-running ``apply_scene_close_summary`` on the same scene
must NOT stack a second "Key quotes:" suffix on each pov_summary. The
builder strips any existing suffix from candidate text before
composing the new one, and the per-POV write replaces (not appends
to) the existing suffix.
"""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA had a heavy talk with you.",
"knowledge_facts": [],
"relationship_summary": "Things shifted.",
}
)
no_threads = json.dumps({"candidates": []})
with open_db(db) as conn:
_seed_single_bot_scene_no_memory(conn)
# Significance >= 2 triggers the Key quotes suffix path.
_seed_memory(conn, pov_summary="Maya quote one", significance=3)
_seed_memory(conn, pov_summary="Maya quote two", significance=2)
project(conn)
# First close.
client = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchall()
assert rows
for (pov,) in rows:
assert pov.count("Key quotes:") == 1
# Second close on the same scene with fresh canned responses.
client2 = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client2,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows2 = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchall()
assert rows2
for (pov,) in rows2:
# Still exactly ONE "Key quotes:" suffix — no recursive bloat.
assert pov.count("Key quotes:") == 1
# And no nested-quote artifacts (the suffix wasn't sourced
# from a row whose text already contained the suffix).
inner_count = pov.count("Key quotes:")
assert inner_count == 1
@pytest.mark.asyncio
async def test_thread_detection_uses_scene_scoped_transcript(
tmp_path, monkeypatch
):
"""T80.2: when a chat has multiple closed scenes, the second scene's
close must hand ``detect_threads`` ONLY the second scene's turns —
not the chat-wide last-50, which would bleed in the first scene's
transcript and risk mis-closing threads."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
captured_transcripts: list[list[dict]] = []
async def capturing_detect_threads(client, **kwargs):
captured_transcripts.append(list(kwargs["scene_transcript"]))
return td_mod.ThreadDetectionResult()
monkeypatch.setattr(td_mod, "detect_threads", capturing_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
# Seed scene 1 + 3 turns + close.
_seed_single_bot_scene(conn)
# Add two extra distinct turns inside scene 1 so the transcript
# has clearly-scene-1 markers we can assert on.
append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "SCENE_ONE_USER_TURN",
"segments": [],
},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "SCENE_ONE_BOT_TURN",
"truncated": False,
"user_turn_id": 2,
},
)
project(conn)
# Close scene 1.
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
# Open scene 2 with distinct dialogue. Use append_and_apply so
# the new events project incrementally without re-running the
# already-applied seed events.
from chat.eventlog.log import append_and_apply
append_and_apply(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T21:00:00+00:00",
"participants": ["you", "bot_a"],
},
)
append_and_apply(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"scene_id": 2,
"pov_summary": "Original (scene 2)",
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"significance": 1,
},
)
append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "SCENE_TWO_USER_TURN",
"segments": [],
},
)
append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "SCENE_TWO_BOT_TURN",
"truncated": False,
"user_turn_id": 3,
},
)
# Close scene 2.
client2 = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client2,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=2,
host_bot_id="bot_a",
)
# The second close's transcript holds only scene-2 markers.
assert len(captured_transcripts) == 2
scene_two_transcript = captured_transcripts[1]
joined = " ".join(t.get("text", "") for t in scene_two_transcript)
assert "SCENE_TWO" in joined
assert "SCENE_ONE" not in joined
@pytest.mark.asyncio
async def test_detect_threads_failure_is_logged(tmp_path, monkeypatch, caplog):
"""T80.3: when ``detect_threads`` raises, the broad except must log
the failure at DEBUG so a programmer-error flap surfaces in local
logs even though the close pipeline keeps moving."""
import logging
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
async def boom(client, **kwargs):
raise RuntimeError("test-detect-threads-boom")
monkeypatch.setattr(td_mod, "detect_threads", boom)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
project(conn)
caplog.set_level(logging.DEBUG, logger="chat.services.scene_summarize")
client = MockLLMClient(canned=[canned])
# Close should NOT raise even though detect_threads did.
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
# Log carries the error message.
assert any(
"detect_threads failed" in rec.message
and "test-detect-threads-boom" in rec.message
for rec in caplog.records
), [r.message for r in caplog.records]
@pytest.mark.asyncio
async def test_thread_closed_uses_chat_clock_time(tmp_path, monkeypatch):
"""T80.4: emitted ``thread_closed`` events stamp ``closed_at`` with
the chat-clock time (chat["time"]), not the host's wall clock. The
rest of the close pipeline already does this; threads must agree
so timeline reconstruction stays consistent."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
async def fake_detect_threads(client, **kwargs):
return td_mod.ThreadDetectionResult(
candidates=[
td_mod.ThreadCandidate(
action="close",
existing_thread_id="thr_x",
summary="resolved",
),
]
)
monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
# Pre-seed an open thread so the "close" candidate has something
# real to close, and pin the chat clock to a known value.
from chat.eventlog.log import append_and_apply
import chat.state.threads # noqa: F401
append_and_apply(
conn,
kind="thread_opened",
payload={
"thread_id": "thr_x",
"chat_id": "chat_bot_a",
"title": "Lingering question",
"summary": "What did Maya hide?",
},
)
project(conn)
# UPDATE chat_state AFTER project so the re-projection doesn't
# overwrite the pinned clock value.
chat_clock = "2026-04-26T10:00:00+00:00"
conn.execute(
"UPDATE chat_state SET time = ? WHERE chat_id = ?",
(chat_clock, "chat_bot_a"),
)
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'thread_closed'"
).fetchall()
assert len(rows) == 1
payload = json.loads(rows[0][0])
assert payload["thread_id"] == "thr_x"
assert payload["closed_at"] == chat_clock
# ---------------------------------------------------------------------------
# T80.5: T58 coverage gaps (truncation, thread update/close emissions).
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_key_quote_truncation_at_200_chars(tmp_path):
"""T80.5: when a memory's pov_summary exceeds 200 chars, the
Key-quote bullet truncates the source text to exactly 200 chars
(no ellipsis — a hard slice, per the existing T58 implementation)."""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA had a heavy talk.",
"knowledge_facts": [],
"relationship_summary": "Things shifted.",
}
)
no_threads = json.dumps({"candidates": []})
long_text = "X" * 500 # 500 X's; expected slice is 200 X's.
with open_db(db) as conn:
_seed_single_bot_scene_no_memory(conn)
_seed_memory(conn, pov_summary=long_text, significance=2)
project(conn)
client = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
new_pov = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchone()[0]
assert "Key quotes:" in new_pov
# The bullet should contain exactly 200 X's, not 500.
# Format from _build_key_quotes_suffix: ``- "<text>"``.
bullet_marker = '- "'
idx = new_pov.index(bullet_marker)
# Count consecutive X's after the bullet marker.
x_run = 0
for ch in new_pov[idx + len(bullet_marker):]:
if ch == "X":
x_run += 1
else:
break
assert x_run == 200, (
f"expected 200-char truncation, got {x_run}"
)
@pytest.mark.asyncio
async def test_thread_detection_update_candidate_emits_thread_updated(
tmp_path, monkeypatch
):
"""T80.5: a detect_threads ``update`` candidate produces a
``thread_updated`` event with the candidate's summary and a
last_referenced_scene_id pointing at the closed scene."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
async def fake_detect_threads(client, **kwargs):
return td_mod.ThreadDetectionResult(
candidates=[
td_mod.ThreadCandidate(
action="update",
existing_thread_id="thr_x",
summary="updated summary",
),
]
)
monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
from chat.eventlog.log import append_and_apply
import chat.state.threads # noqa: F401
# Pre-seed the open thread so the update has a row to target.
append_and_apply(
conn,
kind="thread_opened",
payload={
"thread_id": "thr_x",
"chat_id": "chat_bot_a",
"title": "Lingering question",
"summary": "old summary",
},
)
project(conn)
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'thread_updated'"
).fetchall()
assert len(rows) == 1
payload = json.loads(rows[0][0])
assert payload["thread_id"] == "thr_x"
assert payload["summary"] == "updated summary"
assert payload["last_referenced_scene_id"] == 1
@pytest.mark.asyncio
async def test_thread_detection_close_candidate_emits_thread_closed(
tmp_path, monkeypatch
):
"""T80.5: a detect_threads ``close`` candidate produces a
``thread_closed`` event for the existing thread."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
async def fake_detect_threads(client, **kwargs):
return td_mod.ThreadDetectionResult(
candidates=[
td_mod.ThreadCandidate(
action="close",
existing_thread_id="thr_x",
summary="resolved",
),
]
)
monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
from chat.eventlog.log import append_and_apply
import chat.state.threads # noqa: F401
append_and_apply(
conn,
kind="thread_opened",
payload={
"thread_id": "thr_x",
"chat_id": "chat_bot_a",
"title": "Lingering question",
"summary": "open",
},
)
project(conn)
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'thread_closed'"
).fetchall()
assert len(rows) == 1
payload = json.loads(rows[0][0])
assert payload["thread_id"] == "thr_x"
# closed_at field is present (T80.4 verifies its value).
assert "closed_at" in payload