diff --git a/chat/services/scene_summarize.py b/chat/services/scene_summarize.py index 386cf51..7551f8b 100644 --- a/chat/services/scene_summarize.py +++ b/chat/services/scene_summarize.py @@ -29,6 +29,7 @@ keeps moving. from __future__ import annotations import json +import logging import uuid from datetime import datetime, timezone from sqlite3 import Connection @@ -39,6 +40,8 @@ from chat.eventlog.log import append_and_apply from chat.llm.classify import classify from chat.llm.client import LLMClient +_log = logging.getLogger(__name__) + class ScenePOVSummary(BaseModel): """Classifier output: one witness's view of a closing scene. @@ -123,7 +126,11 @@ async def summarize_scene( def _read_recent_dialogue( - conn: Connection, chat_id: str, *, limit: int = 50 + conn: Connection, + chat_id: str, + *, + limit: int = 50, + since_event_id: int | None = None, ) -> list[dict]: """Pull the last ``limit`` user/assistant turns for ``chat_id``. @@ -132,14 +139,29 @@ def _read_recent_dialogue( the most recent turns of the chat. Superseded and hidden rows are filtered out so regenerated turns (T29) don't bleed into the summary. + + T80.2: ``since_event_id`` clamps the result to event_log rows whose + ``id >= since_event_id`` so callers needing a scene-scoped view (e.g. + thread detection on close) don't pull turns that landed before the + closing scene's ``scene_opened`` event. """ - cur = conn.execute( - "SELECT kind, payload_json FROM event_log " - "WHERE kind IN ('user_turn', 'assistant_turn') " - " AND superseded_by IS NULL AND hidden = 0 " - "ORDER BY id DESC LIMIT ?", - (limit,), - ) + if since_event_id is None: + cur = conn.execute( + "SELECT kind, payload_json FROM event_log " + "WHERE kind IN ('user_turn', 'assistant_turn') " + " AND superseded_by IS NULL AND hidden = 0 " + "ORDER BY id DESC LIMIT ?", + (limit,), + ) + else: + cur = conn.execute( + "SELECT kind, payload_json FROM event_log " + "WHERE kind IN ('user_turn', 'assistant_turn') " + " AND superseded_by IS NULL AND hidden = 0 " + " AND id >= ? " + "ORDER BY id DESC LIMIT ?", + (since_event_id, limit), + ) rows = list(reversed(cur.fetchall())) out: list[dict] = [] for kind, payload_json in rows: @@ -158,6 +180,65 @@ def _read_recent_dialogue( return out +def _scene_opened_event_id( + conn: Connection, chat_id: str, scene_id: int +) -> int | None: + """Return the event_log id of the ``scene_opened`` (or + ``meanwhile_scene_started``) event that created scene row + ``scene_id``. Used by T80.2 to lower-bound dialogue reads to a + single scene's transcript. + + ``meanwhile_scene_started`` carries an explicit ``scene_id`` so we + match on that directly. ``scene_opened`` doesn't, so we walk the + chat's scene rows in id order and zip against the chat's scene-open + events in id order — the projector creates one scene row per + scene-open event, so positions correspond. + + Returns ``None`` when no matching event is found; callers should + treat that as "fall back to chat-wide" rather than over-filter. + """ + # Fast path for meanwhile children (explicit scene_id in payload). + for ev_id, payload_json in conn.execute( + "SELECT id, payload_json FROM event_log " + "WHERE kind = 'meanwhile_scene_started' " + " AND superseded_by IS NULL AND hidden = 0", + ).fetchall(): + try: + p = json.loads(payload_json) + except (TypeError, ValueError): + continue + if p.get("chat_id") == chat_id and p.get("scene_id") == scene_id: + return ev_id + # Fallback for parent you-scenes: zip chat-scoped scene-open events + # against chat-scoped scene rows in id order. + chat_scene_ids = [ + r[0] + for r in conn.execute( + "SELECT id FROM scenes WHERE chat_id = ? ORDER BY id ASC", + (chat_id,), + ).fetchall() + ] + if scene_id not in chat_scene_ids: + return None + chat_open_evs: list[int] = [] + for ev_id, _kind, payload_json in conn.execute( + "SELECT id, kind, payload_json FROM event_log " + "WHERE kind IN ('scene_opened', 'meanwhile_scene_started') " + " AND superseded_by IS NULL AND hidden = 0 " + "ORDER BY id ASC", + ).fetchall(): + try: + p = json.loads(payload_json) + except (TypeError, ValueError): + continue + if p.get("chat_id") == chat_id: + chat_open_evs.append(ev_id) + idx = chat_scene_ids.index(scene_id) + if idx < len(chat_open_evs): + return chat_open_evs[idx] + return None + + async def _summarize_and_apply_for_witness( conn: Connection, client: LLMClient, @@ -213,7 +294,11 @@ async def _summarize_and_apply_for_witness( # Empty default -> skip the memory rewrite; the seeded # per-turn pov_summary stays in place. continue - new_value = pov.summary + key_quotes_suffix + # T80.1: a prior close may have already appended a Key quotes + # suffix to this row's pov_summary. Strip it here so the fresh + # rewrite replaces the existing suffix rather than stacking a + # second one on top. + new_value = _strip_key_quotes_suffix(pov.summary) + key_quotes_suffix append_and_apply( conn, kind="manual_edit", @@ -263,6 +348,31 @@ async def _summarize_and_apply_for_witness( return pov +# T80.1: header marker shared by the suffix builder and the +# witness-write strip step. Any text starting with this marker is treated +# as a previously-appended Key quotes suffix and stripped before reuse so +# repeated scene closes don't compose recursive bloat. +_KEY_QUOTES_HEADER = "\n\nKey quotes:\n" + + +def _strip_key_quotes_suffix(text: str) -> str: + """Remove a previously-appended Key quotes suffix from ``text``. + + Returns ``text`` unchanged when the marker is absent, or the prefix + up to (but not including) the marker when present. Used in two + places: (1) when sourcing quote text from a memory row that may + already carry the suffix from a prior close, and (2) when computing + the per-POV rewrite's prior_value so the new write replaces — rather + than stacks on — the old suffix. + """ + if not text: + return text + idx = text.find(_KEY_QUOTES_HEADER) + if idx >= 0: + return text[:idx] + return text + + def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str: """If the scene's max-turn-significance is >= 2, build the "Key quotes:" suffix from the top-3 highest-significance memory rows @@ -274,6 +384,10 @@ def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str: per-turn narrative seeded by T21, since this helper is called BEFORE the per-POV rewrite. Texts are truncated to 200 chars to bound memory row growth across many witnesses. + + T80.1: candidate text is run through :func:`_strip_key_quotes_suffix` + first so a re-close (whose source memories already carry a suffix from + the prior close) doesn't quote a quote. """ row = conn.execute( "SELECT MAX(significance) FROM memories WHERE scene_id = ?", @@ -288,7 +402,7 @@ def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str: (scene_id,), ) quotes = [ - (r[0] or "")[:200] + _strip_key_quotes_suffix(r[0] or "")[:200] for r in cur.fetchall() ] if not quotes: @@ -454,20 +568,35 @@ async def apply_scene_close_summary( }, ) - # T58.2: thread detection on close. Reuses the dialogue we already - # gathered for per-POV summarization — same {speaker, text} shape - # detect_threads expects. Failure-tolerant: classify() returns the - # empty default on retry-exhaustion, and the broad except below - # protects the close pipeline from any other classifier/mock flap. + # T58.2: thread detection on close. Failure-tolerant: classify() + # returns the empty default on retry-exhaustion, and the broad except + # below protects the close pipeline from any other classifier/mock + # flap. + # + # T80.2: thread detection runs against a SCENE-SCOPED transcript, + # not the chat-wide last-50 turns used by the per-POV summaries. + # Mis-attributing threads when scene boundaries fall inside the last + # 50 turns would otherwise close threads opened in a prior scene. + scene_open_ev_id = _scene_opened_event_id(conn, chat_id, scene_id) + if scene_open_ev_id is not None: + scene_dialogue = _read_recent_dialogue( + conn, chat_id, since_event_id=scene_open_ev_id + ) + else: + scene_dialogue = dialogue try: thread_result = await detect_threads( client, classifier_model=classifier_model, - scene_transcript=dialogue, + scene_transcript=scene_dialogue, open_threads=list_open_threads(conn, chat_id), timeout_s=timeout_s, ) - except Exception: + except Exception as exc: + # T80.3: log the swallowed exception at DEBUG so a + # programmer-error flap (e.g. wrong kwarg name) surfaces in + # local logs without breaking the close pipeline. + _log.debug("detect_threads failed: %s", exc, exc_info=True) from chat.services.thread_detection import ThreadDetectionResult thread_result = ThreadDetectionResult() @@ -495,12 +624,20 @@ async def apply_scene_close_summary( }, ) elif cand.action == "close" and cand.existing_thread_id: + # T80.4: chat-clock time, not wall clock — the rest of the + # close pipeline (memories, edges, scene_closed payloads) + # uses chat["time"] so threads must agree. Falls back to + # UTC now only when the chat row has no clock yet (defensive + # — chat_state always seeds "time" via chat_created). + chat_clock_at = chat.get("time") or datetime.now( + timezone.utc + ).isoformat() append_and_apply( conn, kind="thread_closed", payload={ "thread_id": cand.existing_thread_id, - "closed_at": datetime.now(timezone.utc).isoformat(), + "closed_at": chat_clock_at, }, ) diff --git a/tests/test_per_pov_summary.py b/tests/test_per_pov_summary.py index 6984b5c..3ed22ac 100644 --- a/tests/test_per_pov_summary.py +++ b/tests/test_per_pov_summary.py @@ -1418,3 +1418,528 @@ def test_consumed_digest_does_not_render_again(tmp_path): body2 = msgs2[0].content assert "Meanwhile while you were away:" not in body2 assert digest_text not in body2 + + +# --------------------------------------------------------------------------- +# T80: scene_summarize polish bundle. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_scene_close_re_run_does_not_double_suffix(tmp_path): + """T80.1: re-running ``apply_scene_close_summary`` on the same scene + must NOT stack a second "Key quotes:" suffix on each pov_summary. The + builder strips any existing suffix from candidate text before + composing the new one, and the per-POV write replaces (not appends + to) the existing suffix. + """ + db = tmp_path / "t.db" + apply_migrations(db) + canned = json.dumps( + { + "summary": "BotA had a heavy talk with you.", + "knowledge_facts": [], + "relationship_summary": "Things shifted.", + } + ) + no_threads = json.dumps({"candidates": []}) + with open_db(db) as conn: + _seed_single_bot_scene_no_memory(conn) + # Significance >= 2 triggers the Key quotes suffix path. + _seed_memory(conn, pov_summary="Maya quote one", significance=3) + _seed_memory(conn, pov_summary="Maya quote two", significance=2) + project(conn) + + # First close. + client = MockLLMClient(canned=[canned, no_threads]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + rows = conn.execute( + "SELECT pov_summary FROM memories WHERE scene_id = 1" + ).fetchall() + assert rows + for (pov,) in rows: + assert pov.count("Key quotes:") == 1 + + # Second close on the same scene with fresh canned responses. + client2 = MockLLMClient(canned=[canned, no_threads]) + await apply_scene_close_summary( + conn, + client2, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + rows2 = conn.execute( + "SELECT pov_summary FROM memories WHERE scene_id = 1" + ).fetchall() + assert rows2 + for (pov,) in rows2: + # Still exactly ONE "Key quotes:" suffix — no recursive bloat. + assert pov.count("Key quotes:") == 1 + # And no nested-quote artifacts (the suffix wasn't sourced + # from a row whose text already contained the suffix). + inner_count = pov.count("Key quotes:") + assert inner_count == 1 + + +@pytest.mark.asyncio +async def test_thread_detection_uses_scene_scoped_transcript( + tmp_path, monkeypatch +): + """T80.2: when a chat has multiple closed scenes, the second scene's + close must hand ``detect_threads`` ONLY the second scene's turns — + not the chat-wide last-50, which would bleed in the first scene's + transcript and risk mis-closing threads.""" + from chat.services import thread_detection as td_mod + + canned = json.dumps( + { + "summary": "BotA had a quick chat.", + "knowledge_facts": [], + "relationship_summary": "Steady.", + } + ) + + captured_transcripts: list[list[dict]] = [] + + async def capturing_detect_threads(client, **kwargs): + captured_transcripts.append(list(kwargs["scene_transcript"])) + return td_mod.ThreadDetectionResult() + + monkeypatch.setattr(td_mod, "detect_threads", capturing_detect_threads) + + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + # Seed scene 1 + 3 turns + close. + _seed_single_bot_scene(conn) + # Add two extra distinct turns inside scene 1 so the transcript + # has clearly-scene-1 markers we can assert on. + append_event( + conn, + kind="user_turn", + payload={ + "chat_id": "chat_bot_a", + "prose": "SCENE_ONE_USER_TURN", + "segments": [], + }, + ) + append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_bot_a", + "speaker_id": "bot_a", + "text": "SCENE_ONE_BOT_TURN", + "truncated": False, + "user_turn_id": 2, + }, + ) + project(conn) + + # Close scene 1. + client = MockLLMClient(canned=[canned]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + # Open scene 2 with distinct dialogue. Use append_and_apply so + # the new events project incrementally without re-running the + # already-applied seed events. + from chat.eventlog.log import append_and_apply + + append_and_apply( + conn, + kind="scene_opened", + payload={ + "chat_id": "chat_bot_a", + "container_id": 1, + "started_at": "2026-04-26T21:00:00+00:00", + "participants": ["you", "bot_a"], + }, + ) + append_and_apply( + conn, + kind="memory_written", + payload={ + "owner_id": "bot_a", + "chat_id": "chat_bot_a", + "scene_id": 2, + "pov_summary": "Original (scene 2)", + "witness_you": 1, + "witness_host": 1, + "witness_guest": 0, + "significance": 1, + }, + ) + append_and_apply( + conn, + kind="user_turn", + payload={ + "chat_id": "chat_bot_a", + "prose": "SCENE_TWO_USER_TURN", + "segments": [], + }, + ) + append_and_apply( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_bot_a", + "speaker_id": "bot_a", + "text": "SCENE_TWO_BOT_TURN", + "truncated": False, + "user_turn_id": 3, + }, + ) + + # Close scene 2. + client2 = MockLLMClient(canned=[canned]) + await apply_scene_close_summary( + conn, + client2, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=2, + host_bot_id="bot_a", + ) + + # The second close's transcript holds only scene-2 markers. + assert len(captured_transcripts) == 2 + scene_two_transcript = captured_transcripts[1] + joined = " ".join(t.get("text", "") for t in scene_two_transcript) + assert "SCENE_TWO" in joined + assert "SCENE_ONE" not in joined + + +@pytest.mark.asyncio +async def test_detect_threads_failure_is_logged(tmp_path, monkeypatch, caplog): + """T80.3: when ``detect_threads`` raises, the broad except must log + the failure at DEBUG so a programmer-error flap surfaces in local + logs even though the close pipeline keeps moving.""" + import logging + + from chat.services import thread_detection as td_mod + + canned = json.dumps( + { + "summary": "BotA had a quick chat.", + "knowledge_facts": [], + "relationship_summary": "Steady.", + } + ) + + async def boom(client, **kwargs): + raise RuntimeError("test-detect-threads-boom") + + monkeypatch.setattr(td_mod, "detect_threads", boom) + + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_single_bot_scene(conn) + project(conn) + + caplog.set_level(logging.DEBUG, logger="chat.services.scene_summarize") + client = MockLLMClient(canned=[canned]) + # Close should NOT raise even though detect_threads did. + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + # Log carries the error message. + assert any( + "detect_threads failed" in rec.message + and "test-detect-threads-boom" in rec.message + for rec in caplog.records + ), [r.message for r in caplog.records] + + +@pytest.mark.asyncio +async def test_thread_closed_uses_chat_clock_time(tmp_path, monkeypatch): + """T80.4: emitted ``thread_closed`` events stamp ``closed_at`` with + the chat-clock time (chat["time"]), not the host's wall clock. The + rest of the close pipeline already does this; threads must agree + so timeline reconstruction stays consistent.""" + from chat.services import thread_detection as td_mod + + canned = json.dumps( + { + "summary": "BotA had a quick chat.", + "knowledge_facts": [], + "relationship_summary": "Steady.", + } + ) + + async def fake_detect_threads(client, **kwargs): + return td_mod.ThreadDetectionResult( + candidates=[ + td_mod.ThreadCandidate( + action="close", + existing_thread_id="thr_x", + summary="resolved", + ), + ] + ) + + monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads) + + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_single_bot_scene(conn) + # Pre-seed an open thread so the "close" candidate has something + # real to close, and pin the chat clock to a known value. + from chat.eventlog.log import append_and_apply + import chat.state.threads # noqa: F401 + + append_and_apply( + conn, + kind="thread_opened", + payload={ + "thread_id": "thr_x", + "chat_id": "chat_bot_a", + "title": "Lingering question", + "summary": "What did Maya hide?", + }, + ) + project(conn) + # UPDATE chat_state AFTER project so the re-projection doesn't + # overwrite the pinned clock value. + chat_clock = "2026-04-26T10:00:00+00:00" + conn.execute( + "UPDATE chat_state SET time = ? WHERE chat_id = ?", + (chat_clock, "chat_bot_a"), + ) + + client = MockLLMClient(canned=[canned]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + rows = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'thread_closed'" + ).fetchall() + assert len(rows) == 1 + payload = json.loads(rows[0][0]) + assert payload["thread_id"] == "thr_x" + assert payload["closed_at"] == chat_clock + + +# --------------------------------------------------------------------------- +# T80.5: T58 coverage gaps (truncation, thread update/close emissions). +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_key_quote_truncation_at_200_chars(tmp_path): + """T80.5: when a memory's pov_summary exceeds 200 chars, the + Key-quote bullet truncates the source text to exactly 200 chars + (no ellipsis — a hard slice, per the existing T58 implementation).""" + db = tmp_path / "t.db" + apply_migrations(db) + canned = json.dumps( + { + "summary": "BotA had a heavy talk.", + "knowledge_facts": [], + "relationship_summary": "Things shifted.", + } + ) + no_threads = json.dumps({"candidates": []}) + long_text = "X" * 500 # 500 X's; expected slice is 200 X's. + with open_db(db) as conn: + _seed_single_bot_scene_no_memory(conn) + _seed_memory(conn, pov_summary=long_text, significance=2) + project(conn) + + client = MockLLMClient(canned=[canned, no_threads]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + new_pov = conn.execute( + "SELECT pov_summary FROM memories WHERE scene_id = 1" + ).fetchone()[0] + assert "Key quotes:" in new_pov + # The bullet should contain exactly 200 X's, not 500. + # Format from _build_key_quotes_suffix: ``- ""``. + bullet_marker = '- "' + idx = new_pov.index(bullet_marker) + # Count consecutive X's after the bullet marker. + x_run = 0 + for ch in new_pov[idx + len(bullet_marker):]: + if ch == "X": + x_run += 1 + else: + break + assert x_run == 200, ( + f"expected 200-char truncation, got {x_run}" + ) + + +@pytest.mark.asyncio +async def test_thread_detection_update_candidate_emits_thread_updated( + tmp_path, monkeypatch +): + """T80.5: a detect_threads ``update`` candidate produces a + ``thread_updated`` event with the candidate's summary and a + last_referenced_scene_id pointing at the closed scene.""" + from chat.services import thread_detection as td_mod + + canned = json.dumps( + { + "summary": "BotA had a quick chat.", + "knowledge_facts": [], + "relationship_summary": "Steady.", + } + ) + + async def fake_detect_threads(client, **kwargs): + return td_mod.ThreadDetectionResult( + candidates=[ + td_mod.ThreadCandidate( + action="update", + existing_thread_id="thr_x", + summary="updated summary", + ), + ] + ) + + monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads) + + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_single_bot_scene(conn) + from chat.eventlog.log import append_and_apply + import chat.state.threads # noqa: F401 + + # Pre-seed the open thread so the update has a row to target. + append_and_apply( + conn, + kind="thread_opened", + payload={ + "thread_id": "thr_x", + "chat_id": "chat_bot_a", + "title": "Lingering question", + "summary": "old summary", + }, + ) + project(conn) + + client = MockLLMClient(canned=[canned]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + rows = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'thread_updated'" + ).fetchall() + assert len(rows) == 1 + payload = json.loads(rows[0][0]) + assert payload["thread_id"] == "thr_x" + assert payload["summary"] == "updated summary" + assert payload["last_referenced_scene_id"] == 1 + + +@pytest.mark.asyncio +async def test_thread_detection_close_candidate_emits_thread_closed( + tmp_path, monkeypatch +): + """T80.5: a detect_threads ``close`` candidate produces a + ``thread_closed`` event for the existing thread.""" + from chat.services import thread_detection as td_mod + + canned = json.dumps( + { + "summary": "BotA had a quick chat.", + "knowledge_facts": [], + "relationship_summary": "Steady.", + } + ) + + async def fake_detect_threads(client, **kwargs): + return td_mod.ThreadDetectionResult( + candidates=[ + td_mod.ThreadCandidate( + action="close", + existing_thread_id="thr_x", + summary="resolved", + ), + ] + ) + + monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads) + + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_single_bot_scene(conn) + from chat.eventlog.log import append_and_apply + import chat.state.threads # noqa: F401 + + append_and_apply( + conn, + kind="thread_opened", + payload={ + "thread_id": "thr_x", + "chat_id": "chat_bot_a", + "title": "Lingering question", + "summary": "open", + }, + ) + project(conn) + + client = MockLLMClient(canned=[canned]) + await apply_scene_close_summary( + conn, + client, + classifier_model="x", + chat_id="chat_bot_a", + scene_id=1, + host_bot_id="bot_a", + ) + + rows = conn.execute( + "SELECT payload_json FROM event_log WHERE kind = 'thread_closed'" + ).fetchall() + assert len(rows) == 1 + payload = json.loads(rows[0][0]) + assert payload["thread_id"] == "thr_x" + # closed_at field is present (T80.4 verifies its value). + assert "closed_at" in payload