merge: T58 scene compression + thread emission on close

This commit is contained in:
Joseph Doherty
2026-04-26 20:21:01 -04:00
2 changed files with 357 additions and 6 deletions
+102 -1
View File
@@ -29,6 +29,8 @@ keeps moving.
from __future__ import annotations from __future__ import annotations
import json import json
import uuid
from datetime import datetime, timezone
from sqlite3 import Connection from sqlite3 import Connection
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@@ -167,6 +169,7 @@ async def _summarize_and_apply_for_witness(
you_name: str, you_name: str,
dialogue: list[dict], dialogue: list[dict],
timeout_s: float, timeout_s: float,
key_quotes_suffix: str = "",
) -> ScenePOVSummary: ) -> ScenePOVSummary:
"""Run :func:`summarize_scene` for one bot witness and apply the """Run :func:`summarize_scene` for one bot witness and apply the
three projected updates (memory pov_summary rewrite, edge summary three projected updates (memory pov_summary rewrite, edge summary
@@ -175,6 +178,10 @@ async def _summarize_and_apply_for_witness(
Tolerant of missing pieces in the same way Phase 1 was: no memory Tolerant of missing pieces in the same way Phase 1 was: no memory
row -> skip the rewrite; no edge row -> skip the edge_summary write row -> skip the rewrite; no edge row -> skip the edge_summary write
(the empty-default classifier output simply yields no rewrites). (the empty-default classifier output simply yields no rewrites).
``key_quotes_suffix`` is appended verbatim to the per-POV summary
text before the rewrite lands (T58.1) — empty string is the no-op
default for low-significance scenes.
""" """
from chat.state.edges import get_edge from chat.state.edges import get_edge
from chat.state.entities import get_bot from chat.state.entities import get_bot
@@ -206,6 +213,7 @@ async def _summarize_and_apply_for_witness(
# Empty default -> skip the memory rewrite; the seeded # Empty default -> skip the memory rewrite; the seeded
# per-turn pov_summary stays in place. # per-turn pov_summary stays in place.
continue continue
new_value = pov.summary + key_quotes_suffix
append_and_apply( append_and_apply(
conn, conn,
kind="manual_edit", kind="manual_edit",
@@ -213,7 +221,7 @@ async def _summarize_and_apply_for_witness(
"target_kind": "memory_pov_summary", "target_kind": "memory_pov_summary",
"target_id": int(memory_id), "target_id": int(memory_id),
"prior_value": prior_pov, "prior_value": prior_pov,
"new_value": pov.summary, "new_value": new_value,
}, },
) )
@@ -255,6 +263,40 @@ async def _summarize_and_apply_for_witness(
return pov return pov
def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str:
"""If the scene's max-turn-significance is >= 2, build the
"Key quotes:" suffix from the top-3 highest-significance memory rows
(per requirements §11.1). Otherwise return the empty string so the
per-POV summaries collapse fully (low-significance scenes lose all
raw text in favor of the classifier rewrite).
Quote source is each memory's current ``pov_summary`` — the raw
per-turn narrative seeded by T21, since this helper is called BEFORE
the per-POV rewrite. Texts are truncated to 200 chars to bound
memory row growth across many witnesses.
"""
row = conn.execute(
"SELECT MAX(significance) FROM memories WHERE scene_id = ?",
(scene_id,),
).fetchone()
max_sig = (row[0] if row else None) or 0
if max_sig < 2:
return ""
cur = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = ? "
"ORDER BY significance DESC, id ASC LIMIT 3",
(scene_id,),
)
quotes = [
(r[0] or "")[:200]
for r in cur.fetchall()
]
if not quotes:
return ""
lines = "\n".join(f'- "{q}"' for q in quotes)
return f"\n\nKey quotes:\n{lines}"
async def apply_scene_close_summary( async def apply_scene_close_summary(
conn: Connection, conn: Connection,
client: LLMClient, client: LLMClient,
@@ -296,8 +338,10 @@ async def apply_scene_close_summary(
""" """
# Local imports to keep the module-level surface tight and avoid # Local imports to keep the module-level surface tight and avoid
# any chance of a circular dep through chat.state.*. # any chance of a circular dep through chat.state.*.
from chat.services.thread_detection import detect_threads
from chat.state.entities import get_bot, get_you from chat.state.entities import get_bot, get_you
from chat.state.group_node import get_group_node from chat.state.group_node import get_group_node
from chat.state.threads import list_open_threads
from chat.state.world import get_chat from chat.state.world import get_chat
you_entity = get_you(conn) or {"name": "you", "persona": ""} you_entity = get_you(conn) or {"name": "you", "persona": ""}
@@ -308,6 +352,11 @@ async def apply_scene_close_summary(
dialogue = _read_recent_dialogue(conn, chat_id) dialogue = _read_recent_dialogue(conn, chat_id)
# T58.1: build the "Key quotes:" suffix BEFORE the per-POV rewrites
# land — quote source is the raw seeded pov_summary text on each
# memory row, which the rewrite about to fire would clobber.
key_quotes_suffix = _build_key_quotes_suffix(conn, scene_id)
host_pov = await _summarize_and_apply_for_witness( host_pov = await _summarize_and_apply_for_witness(
conn, conn,
client, client,
@@ -318,6 +367,7 @@ async def apply_scene_close_summary(
you_name=you_name, you_name=you_name,
dialogue=dialogue, dialogue=dialogue,
timeout_s=timeout_s, timeout_s=timeout_s,
key_quotes_suffix=key_quotes_suffix,
) )
guest_pov: ScenePOVSummary | None = None guest_pov: ScenePOVSummary | None = None
@@ -332,6 +382,7 @@ async def apply_scene_close_summary(
you_name=you_name, you_name=you_name,
dialogue=dialogue, dialogue=dialogue,
timeout_s=timeout_s, timeout_s=timeout_s,
key_quotes_suffix=key_quotes_suffix,
) )
# Group node update: T70 runs a third classifier call to merge the # Group node update: T70 runs a third classifier call to merge the
@@ -364,6 +415,56 @@ async def apply_scene_close_summary(
}, },
) )
# T58.2: thread detection on close. Reuses the dialogue we already
# gathered for per-POV summarization — same {speaker, text} shape
# detect_threads expects. Failure-tolerant: classify() returns the
# empty default on retry-exhaustion, and the broad except below
# protects the close pipeline from any other classifier/mock flap.
try:
thread_result = await detect_threads(
client,
classifier_model=classifier_model,
scene_transcript=dialogue,
open_threads=list_open_threads(conn, chat_id),
timeout_s=timeout_s,
)
except Exception:
from chat.services.thread_detection import ThreadDetectionResult
thread_result = ThreadDetectionResult()
for cand in thread_result.candidates:
if cand.action == "open":
new_thread_id = f"thr_{uuid.uuid4().hex[:12]}"
append_and_apply(
conn,
kind="thread_opened",
payload={
"thread_id": new_thread_id,
"chat_id": chat_id,
"title": cand.title,
"summary": cand.summary,
},
)
elif cand.action == "update" and cand.existing_thread_id:
append_and_apply(
conn,
kind="thread_updated",
payload={
"thread_id": cand.existing_thread_id,
"summary": cand.summary,
"last_referenced_scene_id": scene_id,
},
)
elif cand.action == "close" and cand.existing_thread_id:
append_and_apply(
conn,
kind="thread_closed",
payload={
"thread_id": cand.existing_thread_id,
"closed_at": datetime.now(timezone.utc).isoformat(),
},
)
return host_pov return host_pov
+255 -5
View File
@@ -504,13 +504,15 @@ async def test_close_with_no_guest_matches_phase1(tmp_path):
"relationship_summary": "BotA leaned in supportively.", "relationship_summary": "BotA leaned in supportively.",
} }
) )
no_threads = json.dumps({"candidates": []})
with open_db(db) as conn: with open_db(db) as conn:
_seed_single_bot_scene(conn) _seed_single_bot_scene(conn)
project(conn) project(conn)
# canned has 2 entries to detect any over-call; the assertion below # 1 host-POV entry + 1 thread-detection entry (T58.2) + 1 spare
# confirms only one was consumed. # to detect any over-call. Assertion below confirms exactly two
client = MockLLMClient(canned=[canned, canned]) # were consumed.
client = MockLLMClient(canned=[canned, no_threads, canned])
await apply_scene_close_summary( await apply_scene_close_summary(
conn, conn,
client, client,
@@ -520,8 +522,8 @@ async def test_close_with_no_guest_matches_phase1(tmp_path):
host_bot_id="bot_a", host_bot_id="bot_a",
) )
# Exactly one classifier call -> exactly one canned entry consumed, # Host POV + thread detection -> exactly two canned entries
# leaving the second untouched. # consumed, leaving the spare untouched.
assert len(client._canned) == 1 assert len(client._canned) == 1
# Host memory rewritten with the per-POV summary content. # Host memory rewritten with the per-POV summary content.
@@ -845,3 +847,251 @@ async def test_group_summary_skipped_when_no_guest(tmp_path):
"SELECT 1 FROM event_log WHERE kind = 'group_node_updated'" "SELECT 1 FROM event_log WHERE kind = 'group_node_updated'"
).fetchall() ).fetchall()
assert rows == [] assert rows == []
# ---------------------------------------------------------------------------
# T58: significance-driven quote retention + thread detection on close.
# ---------------------------------------------------------------------------
def _seed_single_bot_scene_no_memory(conn) -> None:
"""Like ``_seed_single_bot_scene`` but skips the memory_written event so
callers can seed memories with custom significance / text themselves."""
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(
conn,
kind="you_authored",
payload={"name": "Me", "pronouns": "they/them", "persona": "engineer"},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="container_created",
payload={
"chat_id": "chat_bot_a",
"name": "office",
"type": "workplace",
"properties": {},
},
)
append_event(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T20:00:00+00:00",
"participants": ["you", "bot_a"],
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
},
)
append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "Quick chat about the deadline",
"segments": [],
},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "It's going to be okay.",
"truncated": False,
"user_turn_id": 1,
},
)
def _seed_memory(conn, *, pov_summary: str, significance: int) -> None:
append_event(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"scene_id": 1,
"pov_summary": pov_summary,
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"significance": significance,
},
)
@pytest.mark.asyncio
async def test_low_significance_scene_omits_quotes(tmp_path):
"""When the scene's max-turn-significance is < 2, the per-POV summary
rewrite collapses fully — no "Key quotes:" suffix is appended."""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA had a low-key chat with you.",
"knowledge_facts": [],
"relationship_summary": "Nothing major shifted.",
}
)
no_threads = json.dumps({"candidates": []})
with open_db(db) as conn:
_seed_single_bot_scene_no_memory(conn)
_seed_memory(conn, pov_summary="Maya rambled about coffee", significance=1)
_seed_memory(conn, pov_summary="Maya glanced at the clock", significance=0)
project(conn)
client = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchall()
assert rows
for (pov,) in rows:
assert "Key quotes:" not in pov
assert "BotA had a low-key chat" in pov
@pytest.mark.asyncio
async def test_high_significance_scene_includes_top_3_quotes(tmp_path):
"""When max-turn-significance is >= 2, each per-POV summary text gains
a "Key quotes:" suffix listing the top-3 highest-significance memory
rows verbatim, ordered by (significance DESC, id ASC)."""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA had a heavy talk with you.",
"knowledge_facts": [],
"relationship_summary": "Things shifted.",
}
)
no_threads = json.dumps({"candidates": []})
with open_db(db) as conn:
_seed_single_bot_scene_no_memory(conn)
# Insertion order matches id ASC. Top-3 by (sig DESC, id ASC):
# quote 1 (sig 3) -> quote 2 (sig 2, lower id) -> quote 4 (sig 2,
# higher id). quote 3 (sig 1) is dropped.
_seed_memory(conn, pov_summary="Maya quote one", significance=3)
_seed_memory(conn, pov_summary="Maya quote two", significance=2)
_seed_memory(conn, pov_summary="Maya quote three", significance=1)
_seed_memory(conn, pov_summary="Maya quote four", significance=2)
project(conn)
client = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchall()
assert rows
for (pov,) in rows:
assert "Key quotes:" in pov
assert '"Maya quote one"' in pov
assert '"Maya quote two"' in pov
assert '"Maya quote four"' in pov
# The sig-1 quote falls outside the top-3 cap.
assert '"Maya quote three"' not in pov
# Ordering: sig 3 first, then the two sig-2s by id ASC.
i_one = pov.index('"Maya quote one"')
i_two = pov.index('"Maya quote two"')
i_four = pov.index('"Maya quote four"')
assert i_one < i_two < i_four
@pytest.mark.asyncio
async def test_thread_detection_emits_events(tmp_path, monkeypatch):
"""On scene close, ``detect_threads`` is invoked and each "open"
candidate yields a ``thread_opened`` event with a fresh thread_id."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA noticed something unresolved.",
"knowledge_facts": [],
"relationship_summary": "Tension lingered.",
}
)
async def fake_detect_threads(client, **kwargs):
return td_mod.ThreadDetectionResult(
candidates=[
td_mod.ThreadCandidate(
action="open",
title="Test thread",
summary="A test",
existing_thread_id=None,
),
]
)
monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
project(conn)
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'thread_opened'"
).fetchall()
assert len(rows) == 1
payload = json.loads(rows[0][0])
assert payload["title"] == "Test thread"
assert payload["summary"] == "A test"
assert payload["chat_id"] == "chat_bot_a"
assert payload["thread_id"].startswith("thr_")
# The threads-table projection ran via append_and_apply.
from chat.state.threads import list_open_threads
open_threads = list_open_threads(conn, "chat_bot_a")
assert len(open_threads) == 1
assert open_threads[0]["title"] == "Test thread"