Files
chat/tests/test_per_pov_summary.py
T
Joseph Doherty d123684f9a fix: guard scene close key-quote suffix against re-close bloat (T80.1)
Re-running apply_scene_close_summary on the same scene previously caused
recursive bloat: _build_key_quotes_suffix sourced quote text from
memories.pov_summary, which after the first close already carried a
"Key quotes:" suffix. The next close would then quote the quotes,
nesting deeper each time.

Strip any existing suffix from candidate text before truncating to
200 chars in the suffix builder, and from the fresh classifier output
before composing the new value in _summarize_and_apply_for_witness so
the rewrite replaces rather than stacks.

Adds test_scene_close_re_run_does_not_double_suffix.
2026-04-26 21:46:20 -04:00

1493 lines
47 KiB
Python

"""Per-POV summary and edge summary update on scene close (T27).
When a scene closes (via the auto-close path in the turn flow or the
manual button in the drawer), we run a classifier that produces a
per-POV summary for each present witness — Phase 1 single-bot only the
host bot, since "you" doesn't have a memory store in v1. The output
drives three projected updates:
1. Each ``memories`` row for the closed scene owned by the host bot has
its ``pov_summary`` rewritten via ``manual_edit`` events
(``target_kind="memory_pov_summary"``) so the field carries a proper
scene-level summary instead of the per-turn raw narrative seeded by
T21.
2. The directed bot->you ``edges.summary`` is updated via a new
``manual_edit`` target_kind ``edge_summary``. v1 strategy combines
the prior summary with the classifier's ``relationship_summary``
field; the LLM is the one phrasing the merge.
3. Newly-learned facts from the classifier's ``knowledge_facts`` field
are appended via the existing ``edge_update`` event handler.
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from chat.db.connection import open_db
from chat.db.migrate import apply_migrations
from chat.eventlog.log import append_event
from chat.eventlog.projector import project
from chat.llm.mock import MockLLMClient
from chat.services.scene_summarize import (
ScenePOVSummary,
apply_scene_close_summary,
summarize_scene,
)
# Importing for handler-registration side effects so the freshly-migrated
# DB created in each test below has the projector ready.
import chat.state.edges # noqa: F401
import chat.state.entities # noqa: F401
import chat.state.manual_edit # noqa: F401
import chat.state.memory # noqa: F401
import chat.state.world # noqa: F401
# ---------------------------------------------------------------------------
# Service-level tests — no FastAPI involvement.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_summarize_scene_parses_classifier_output():
canned = json.dumps(
{
"summary": "BotA shared a quiet moment with you in the office.",
"knowledge_facts": ["You like coffee black."],
"relationship_summary": "BotA feels closer to you after this conversation.",
}
)
mock = MockLLMClient(canned=[canned])
result = await summarize_scene(
mock,
model="x",
bot_name="BotA",
bot_persona="thoughtful",
you_name="Me",
prior_edge_summary="",
dialogue=[
{"speaker": "Me", "text": "hi"},
{"speaker": "BotA", "text": "Hello!"},
],
)
assert isinstance(result, ScenePOVSummary)
assert result.summary.startswith("BotA shared")
assert result.knowledge_facts == ["You like coffee black."]
assert "closer" in result.relationship_summary
@pytest.mark.asyncio
async def test_summarize_scene_default_on_failure():
"""Two consecutive non-JSON returns trip the classifier's retry-then-default
path; we should get the empty fallback rather than crashing the close
flow."""
mock = MockLLMClient(canned=["bad", "still bad", "bad3"])
result = await summarize_scene(
mock,
model="x",
bot_name="BotA",
bot_persona="",
you_name="Me",
prior_edge_summary="",
dialogue=[],
)
assert result.summary == ""
assert result.knowledge_facts == []
assert result.relationship_summary == ""
@pytest.mark.asyncio
async def test_apply_scene_close_summary_updates_memories_and_edge(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA reassured you about the project deadline.",
"knowledge_facts": ["You are nervous about the deadline."],
"relationship_summary": "BotA showed quiet support.",
}
)
with open_db(db) as conn:
# Seed bot, you, chat, container, scene, edge, memory, dialogue.
append_event(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "...",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "",
},
)
append_event(
conn,
kind="you_authored",
payload={
"name": "Me",
"pronouns": "they/them",
"persona": "engineer",
},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="container_created",
payload={
"chat_id": "chat_bot_a",
"name": "office",
"type": "workplace",
"properties": {},
},
)
append_event(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T20:00:00+00:00",
"participants": ["you", "bot_a"],
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
},
)
append_event(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"scene_id": 1,
"pov_summary": "Original raw narrative",
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"significance": 1,
},
)
append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "I'm nervous about the deadline",
"segments": [],
},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "It's going to be okay.",
"truncated": False,
"user_turn_id": 1,
},
)
project(conn)
client = MockLLMClient(canned=[canned])
result = await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
# Returned summary plumbs through.
assert "reassured" in result.summary
assert result.knowledge_facts == ["You are nervous about the deadline."]
# Memory pov_summary updated.
new_pov = conn.execute(
"SELECT pov_summary FROM memories "
"WHERE owner_id = 'bot_a' AND scene_id = 1"
).fetchone()[0]
assert "reassured" in new_pov
# And the manual_edit event was logged with prior_value capture.
edits = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'manual_edit'"
).fetchall()
assert any(
json.loads(p[0]).get("target_kind") == "memory_pov_summary"
for p in edits
)
mem_edit = next(
json.loads(p[0])
for p in edits
if json.loads(p[0]).get("target_kind") == "memory_pov_summary"
)
assert mem_edit["prior_value"] == "Original raw narrative"
# Edge summary updated via manual_edit (target_kind="edge_summary").
from chat.state.edges import get_edge
edge = get_edge(conn, "bot_a", "you")
assert "support" in edge["summary"]
assert any(
json.loads(p[0]).get("target_kind") == "edge_summary"
for p in edits
)
# Knowledge fact appended via edge_update.
assert any("deadline" in fact for fact in edge["knowledge"])
# ---------------------------------------------------------------------------
# T45: per-POV summaries on close for each present witness.
# ---------------------------------------------------------------------------
def _bot_payload(bot_id: str, name: str, persona: str = "thoughtful") -> dict:
return {
"id": bot_id,
"name": name,
"persona": persona,
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "",
}
def _seed_single_bot_scene(conn) -> None:
"""Seed the canonical Phase 1 single-bot scene used by the regression test."""
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(
conn,
kind="you_authored",
payload={"name": "Me", "pronouns": "they/them", "persona": "engineer"},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="container_created",
payload={
"chat_id": "chat_bot_a",
"name": "office",
"type": "workplace",
"properties": {},
},
)
append_event(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T20:00:00+00:00",
"participants": ["you", "bot_a"],
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
},
)
append_event(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"scene_id": 1,
"pov_summary": "Original raw narrative (host)",
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"significance": 1,
},
)
append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "Quick chat about the deadline",
"segments": [],
},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "It's going to be okay.",
"truncated": False,
"user_turn_id": 1,
},
)
def _seed_two_bot_scene(conn, *, with_group_node: bool = False) -> None:
"""Seed a host+guest scene with bot_a (host) and bot_b (guest), plus a
memory row per bot owner so each per-POV update has something to rewrite,
and seeded directed edges from each bot to ``you`` so each edge_summary
update has a row to operate on. Optionally seeds the group_node row too.
"""
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_b", "BotB"))
append_event(
conn,
kind="you_authored",
payload={"name": "Me", "pronouns": "they/them", "persona": "engineer"},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"guest_bot_id": "bot_b",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="container_created",
payload={
"chat_id": "chat_bot_a",
"name": "office",
"type": "workplace",
"properties": {},
},
)
append_event(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T20:00:00+00:00",
"participants": ["you", "bot_a", "bot_b"],
},
)
# Seed edges in both bot -> you directions so the edge_summary updates
# have rows to target.
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_b",
"target_id": "you",
"chat_id": "chat_bot_a",
},
)
# One memory per witness, scene 1.
append_event(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"scene_id": 1,
"pov_summary": "Original raw narrative (host)",
"witness_you": 1,
"witness_host": 1,
"witness_guest": 1,
"significance": 1,
},
)
append_event(
conn,
kind="memory_written",
payload={
"owner_id": "bot_b",
"chat_id": "chat_bot_a",
"scene_id": 1,
"pov_summary": "Original raw narrative (guest)",
"witness_you": 1,
"witness_host": 1,
"witness_guest": 1,
"significance": 1,
},
)
append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "Three of us in the office.",
"segments": [],
},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "Glad you're both here.",
"truncated": False,
"user_turn_id": 1,
},
)
if with_group_node:
append_event(
conn,
kind="group_node_initialized",
payload={
"chat_id": "chat_bot_a",
"members": ["you", "bot_a", "bot_b"],
"summary": "",
"dynamic": "",
"threads": [],
},
)
@pytest.mark.asyncio
async def test_close_with_no_guest_matches_phase1(tmp_path):
"""Regression: when guest_bot_id is None, the close summary path runs
summarize_scene exactly once and rewrites the host's memory + host->you
edge in place — same as Phase 1 behavior."""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA helped you talk through the deadline anxiety.",
"knowledge_facts": ["Deadline next Friday."],
"relationship_summary": "BotA leaned in supportively.",
}
)
no_threads = json.dumps({"candidates": []})
with open_db(db) as conn:
_seed_single_bot_scene(conn)
project(conn)
# 1 host-POV entry + 1 thread-detection entry (T58.2) + 1 spare
# to detect any over-call. Assertion below confirms exactly two
# were consumed.
client = MockLLMClient(canned=[canned, no_threads, canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
# Host POV + thread detection -> exactly two canned entries
# consumed, leaving the spare untouched.
assert len(client._canned) == 1
# Host memory rewritten with the per-POV summary content.
new_pov = conn.execute(
"SELECT pov_summary FROM memories "
"WHERE owner_id = 'bot_a' AND scene_id = 1"
).fetchone()[0]
assert "BotA helped" in new_pov
# host->you edge summary rewritten with the relationship_summary.
from chat.state.edges import get_edge
edge = get_edge(conn, "bot_a", "you")
assert "supportively" in edge["summary"]
@pytest.mark.asyncio
async def test_close_with_guest_calls_summarize_twice(tmp_path):
"""When a guest is present, summarize_scene runs once per witness
(host + guest) and each bot's memory rewrite uses its own POV summary."""
db = tmp_path / "t.db"
apply_migrations(db)
host_canned = json.dumps(
{
"summary": "BotA noticed BotB warming up to you.",
"knowledge_facts": ["You sketched on the whiteboard."],
"relationship_summary": "BotA felt steady around you.",
}
)
guest_canned = json.dumps(
{
"summary": "BotB found the office quieter than expected.",
"knowledge_facts": ["You prefer black coffee."],
"relationship_summary": "BotB warmed up to you a little.",
}
)
with open_db(db) as conn:
_seed_two_bot_scene(conn)
project(conn)
client = MockLLMClient(canned=[host_canned, guest_canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
# Both canned entries consumed -> classifier ran twice.
assert client._canned == []
# Host memory carries the host's per-POV summary; guest memory
# carries the guest's.
host_pov = conn.execute(
"SELECT pov_summary FROM memories "
"WHERE owner_id = 'bot_a' AND scene_id = 1"
).fetchone()[0]
guest_pov = conn.execute(
"SELECT pov_summary FROM memories "
"WHERE owner_id = 'bot_b' AND scene_id = 1"
).fetchone()[0]
assert "BotA noticed" in host_pov
assert "BotB found" in guest_pov
assert host_pov != guest_pov
@pytest.mark.asyncio
async def test_close_with_guest_updates_both_edges(tmp_path):
"""Both bot->you edges receive their own relationship_summary on close."""
db = tmp_path / "t.db"
apply_migrations(db)
host_canned = json.dumps(
{
"summary": "BotA noticed BotB warming up.",
"knowledge_facts": [],
"relationship_summary": "BotA felt steady around you.",
}
)
guest_canned = json.dumps(
{
"summary": "BotB warmed to the office.",
"knowledge_facts": [],
"relationship_summary": "BotB warmed up to you a little.",
}
)
with open_db(db) as conn:
_seed_two_bot_scene(conn)
project(conn)
client = MockLLMClient(canned=[host_canned, guest_canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
from chat.state.edges import get_edge
edge_h2y = get_edge(conn, "bot_a", "you")
edge_g2y = get_edge(conn, "bot_b", "you")
assert "steady" in edge_h2y["summary"]
assert "warmed up" in edge_g2y["summary"]
# Per-POV; the two edges did not collapse onto the same text.
assert edge_h2y["summary"] != edge_g2y["summary"]
@pytest.mark.asyncio
async def test_close_with_group_node_updates_group_summary(tmp_path):
"""When a group_node row exists, scene close emits group_node_updated
with a non-empty summary that mentions both bots' names. T70 swapped
the Phase 2 naive concat for an LLM-merged summary; this regression
test feeds bad-JSON merge responses so the helper falls back to the
original naive-concat shape, preserving the original assertions."""
db = tmp_path / "t.db"
apply_migrations(db)
import chat.state.group_node # noqa: F401 -- register handlers
host_canned = json.dumps(
{
"summary": "BotA appreciated the calm.",
"knowledge_facts": [],
"relationship_summary": "BotA felt steady.",
}
)
guest_canned = json.dumps(
{
"summary": "BotB found the room friendly.",
"knowledge_facts": [],
"relationship_summary": "BotB warmed up.",
}
)
with open_db(db) as conn:
_seed_two_bot_scene(conn, with_group_node=True)
project(conn)
# 2 valid (host POV, guest POV) + 3 bad-JSON merge attempts ->
# merge_group_summary falls back to the naive concat default.
client = MockLLMClient(
canned=[host_canned, guest_canned, "bad1", "bad2", "bad3"]
)
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
from chat.state.group_node import get_group_node
gn = get_group_node(conn, "chat_bot_a")
assert gn is not None
assert gn["summary"] # non-empty
# Naive-concat fallback surfaces both bot names in the group summary.
assert "BotA" in gn["summary"]
assert "BotB" in gn["summary"]
# Naive-concat fallback keeps dynamic empty.
assert gn["dynamic"] == ""
# ---------------------------------------------------------------------------
# T70: LLM-merged group meta-summary on scene close.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_group_summary_merges_per_pov_via_classifier_when_guest_present(
tmp_path,
):
"""With a guest present and a group_node row, scene close runs the
merge classifier as a third call after the two per-POV summarize_scene
calls; its output drives the group_node summary + dynamic fields."""
db = tmp_path / "t.db"
apply_migrations(db)
import chat.state.group_node # noqa: F401 -- register handlers
host_canned = json.dumps(
{
"summary": "BotA appreciated the calm.",
"knowledge_facts": [],
"relationship_summary": "BotA felt steady.",
}
)
guest_canned = json.dumps(
{
"summary": "BotB found the room friendly.",
"knowledge_facts": [],
"relationship_summary": "BotB warmed up.",
}
)
merge_canned = json.dumps(
{"summary": "merged group view", "dynamic": "warm rapport"}
)
with open_db(db) as conn:
_seed_two_bot_scene(conn, with_group_node=True)
project(conn)
# Canned-queue layout matches the production call order in
# apply_scene_close_summary: host POV summarize_scene runs first,
# then guest POV summarize_scene, then merge_group_summary.
client = MockLLMClient(
canned=[host_canned, guest_canned, merge_canned]
)
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
# All three canned entries consumed -> classifier ran exactly 3x.
assert client._canned == []
from chat.state.group_node import get_group_node
gn = get_group_node(conn, "chat_bot_a")
assert gn is not None
assert gn["summary"] == "merged group view"
assert gn["dynamic"] == "warm rapport"
@pytest.mark.asyncio
async def test_group_summary_falls_back_to_naive_concat_on_classifier_failure(
tmp_path,
):
"""If the merge classifier flaps (bad JSON across all 3 retries), the
helper falls back to the original Phase 2 naive concat shape and
leaves dynamic empty."""
db = tmp_path / "t.db"
apply_migrations(db)
import chat.state.group_node # noqa: F401 -- register handlers
host_canned = json.dumps(
{
"summary": "BotA appreciated the calm.",
"knowledge_facts": [],
"relationship_summary": "BotA felt steady.",
}
)
guest_canned = json.dumps(
{
"summary": "BotB found the room friendly.",
"knowledge_facts": [],
"relationship_summary": "BotB warmed up.",
}
)
with open_db(db) as conn:
_seed_two_bot_scene(conn, with_group_node=True)
project(conn)
# 2 valid POV summaries + 3 bad-JSON merge attempts trip the
# classifier's retry-then-default path; the default is the naive
# concat fallback.
client = MockLLMClient(
canned=[host_canned, guest_canned, "bad1", "bad2", "bad3"]
)
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
from chat.state.group_node import get_group_node
gn = get_group_node(conn, "chat_bot_a")
assert gn is not None
expected = (
"BotA: BotA appreciated the calm.\n\n"
"BotB: BotB found the room friendly."
)
assert gn["summary"] == expected
assert gn["dynamic"] == ""
@pytest.mark.asyncio
async def test_group_summary_skipped_when_no_guest(tmp_path):
"""No-guest path: scene close does NOT invoke merge_group_summary
and emits no group_node_updated event. Confirms the existing
`if guest_bot_id is not None` gating at the call site."""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA helped you talk through the deadline anxiety.",
"knowledge_facts": ["Deadline next Friday."],
"relationship_summary": "BotA leaned in supportively.",
}
)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
project(conn)
# Only 1 canned entry; if merge_group_summary were called the
# MockLLMClient would IndexError on the empty queue.
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
# Exactly the host POV call consumed, nothing else.
assert client._canned == []
# No group_node_updated event was emitted.
rows = conn.execute(
"SELECT 1 FROM event_log WHERE kind = 'group_node_updated'"
).fetchall()
assert rows == []
# ---------------------------------------------------------------------------
# T58: significance-driven quote retention + thread detection on close.
# ---------------------------------------------------------------------------
def _seed_single_bot_scene_no_memory(conn) -> None:
"""Like ``_seed_single_bot_scene`` but skips the memory_written event so
callers can seed memories with custom significance / text themselves."""
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(
conn,
kind="you_authored",
payload={"name": "Me", "pronouns": "they/them", "persona": "engineer"},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="container_created",
payload={
"chat_id": "chat_bot_a",
"name": "office",
"type": "workplace",
"properties": {},
},
)
append_event(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T20:00:00+00:00",
"participants": ["you", "bot_a"],
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
},
)
append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "Quick chat about the deadline",
"segments": [],
},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "It's going to be okay.",
"truncated": False,
"user_turn_id": 1,
},
)
def _seed_memory(conn, *, pov_summary: str, significance: int) -> None:
append_event(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"scene_id": 1,
"pov_summary": pov_summary,
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"significance": significance,
},
)
@pytest.mark.asyncio
async def test_low_significance_scene_omits_quotes(tmp_path):
"""When the scene's max-turn-significance is < 2, the per-POV summary
rewrite collapses fully — no "Key quotes:" suffix is appended."""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA had a low-key chat with you.",
"knowledge_facts": [],
"relationship_summary": "Nothing major shifted.",
}
)
no_threads = json.dumps({"candidates": []})
with open_db(db) as conn:
_seed_single_bot_scene_no_memory(conn)
_seed_memory(conn, pov_summary="Maya rambled about coffee", significance=1)
_seed_memory(conn, pov_summary="Maya glanced at the clock", significance=0)
project(conn)
client = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchall()
assert rows
for (pov,) in rows:
assert "Key quotes:" not in pov
assert "BotA had a low-key chat" in pov
@pytest.mark.asyncio
async def test_high_significance_scene_includes_top_3_quotes(tmp_path):
"""When max-turn-significance is >= 2, each per-POV summary text gains
a "Key quotes:" suffix listing the top-3 highest-significance memory
rows verbatim, ordered by (significance DESC, id ASC)."""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA had a heavy talk with you.",
"knowledge_facts": [],
"relationship_summary": "Things shifted.",
}
)
no_threads = json.dumps({"candidates": []})
with open_db(db) as conn:
_seed_single_bot_scene_no_memory(conn)
# Insertion order matches id ASC. Top-3 by (sig DESC, id ASC):
# quote 1 (sig 3) -> quote 2 (sig 2, lower id) -> quote 4 (sig 2,
# higher id). quote 3 (sig 1) is dropped.
_seed_memory(conn, pov_summary="Maya quote one", significance=3)
_seed_memory(conn, pov_summary="Maya quote two", significance=2)
_seed_memory(conn, pov_summary="Maya quote three", significance=1)
_seed_memory(conn, pov_summary="Maya quote four", significance=2)
project(conn)
client = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchall()
assert rows
for (pov,) in rows:
assert "Key quotes:" in pov
assert '"Maya quote one"' in pov
assert '"Maya quote two"' in pov
assert '"Maya quote four"' in pov
# The sig-1 quote falls outside the top-3 cap.
assert '"Maya quote three"' not in pov
# Ordering: sig 3 first, then the two sig-2s by id ASC.
i_one = pov.index('"Maya quote one"')
i_two = pov.index('"Maya quote two"')
i_four = pov.index('"Maya quote four"')
assert i_one < i_two < i_four
@pytest.mark.asyncio
async def test_thread_detection_emits_events(tmp_path, monkeypatch):
"""On scene close, ``detect_threads`` is invoked and each "open"
candidate yields a ``thread_opened`` event with a fresh thread_id."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA noticed something unresolved.",
"knowledge_facts": [],
"relationship_summary": "Tension lingered.",
}
)
async def fake_detect_threads(client, **kwargs):
return td_mod.ThreadDetectionResult(
candidates=[
td_mod.ThreadCandidate(
action="open",
title="Test thread",
summary="A test",
existing_thread_id=None,
),
]
)
monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
project(conn)
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'thread_opened'"
).fetchall()
assert len(rows) == 1
payload = json.loads(rows[0][0])
assert payload["title"] == "Test thread"
assert payload["summary"] == "A test"
assert payload["chat_id"] == "chat_bot_a"
assert payload["thread_id"].startswith("thr_")
# The threads-table projection ran via append_and_apply.
from chat.state.threads import list_open_threads
open_threads = list_open_threads(conn, "chat_bot_a")
assert len(open_threads) == 1
assert open_threads[0]["title"] == "Test thread"
# ---------------------------------------------------------------------------
# T65: meanwhile summary digest emitted on meanwhile-scene close, surfaced in
# the next you-scene's prompt as a SHOULD-tier "Meanwhile while you were away:"
# block, then consumed so it never re-renders.
# ---------------------------------------------------------------------------
def _seed_meanwhile_scene(conn) -> None:
"""Seed a parent you-scene + a meanwhile child scene with one assistant
turn so apply_scene_close_summary has dialogue to summarize.
The meanwhile scene id is 2 (parent is scene 1). The meanwhile dialogue
is appended via assistant_turn events under chat_bot_a; the
_read_recent_dialogue helper picks them up by chat_id.
"""
import chat.state.meanwhile # noqa: F401 -- register handlers
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_b", "BotB"))
append_event(
conn,
kind="you_authored",
payload={"name": "Me", "pronouns": "they/them", "persona": "engineer"},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"guest_bot_id": "bot_b",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="container_created",
payload={
"chat_id": "chat_bot_a",
"name": "office",
"type": "workplace",
"properties": {},
},
)
# Parent you-scene (scene_id=1).
append_event(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": 1,
"started_at": "2026-04-26T20:00:00+00:00",
"participants": ["you", "bot_a", "bot_b"],
},
)
# Meanwhile child scene (scene_id=2) — bot_a + bot_b only.
append_event(
conn,
kind="meanwhile_scene_started",
payload={
"scene_id": 2,
"chat_id": "chat_bot_a",
"parent_scene_id": 1,
"host_bot_id": "bot_a",
"guest_bot_id": "bot_b",
"started_at": "2026-04-26T20:05:00+00:00",
},
)
# Edges so per-POV apply has rows to update.
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_b",
"target_id": "you",
"chat_id": "chat_bot_a",
},
)
# One memory per witness in the meanwhile scene.
append_event(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"scene_id": 2,
"pov_summary": "Original raw narrative (host, meanwhile)",
"witness_you": 0,
"witness_host": 1,
"witness_guest": 1,
"significance": 1,
},
)
append_event(
conn,
kind="memory_written",
payload={
"owner_id": "bot_b",
"chat_id": "chat_bot_a",
"scene_id": 2,
"pov_summary": "Original raw narrative (guest, meanwhile)",
"witness_you": 0,
"witness_host": 1,
"witness_guest": 1,
"significance": 1,
},
)
# A bot-bot turn happens during the meanwhile scene.
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "Did you hear what happened with the missing file?",
"truncated": False,
"user_turn_id": None,
},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_b",
"text": "I have a theory but no proof yet.",
"truncated": False,
"user_turn_id": None,
},
)
@pytest.mark.asyncio
async def test_meanwhile_close_creates_digest(tmp_path):
"""When apply_scene_close_summary runs on a meanwhile scene
(present_set_kind == 'host_guest'), it emits a meanwhile_digest_created
event after the per-POV summaries land; the meanwhile_digest_pending
table then holds a row with non-empty summary text."""
db = tmp_path / "t.db"
apply_migrations(db)
host_canned = json.dumps(
{
"summary": "BotA confided in BotB about the missing file.",
"knowledge_facts": [],
"relationship_summary": "BotA leaned on BotB.",
}
)
guest_canned = json.dumps(
{
"summary": "BotB listened and offered to help investigate.",
"knowledge_facts": [],
"relationship_summary": "BotB grew protective.",
}
)
digest_canned = json.dumps(
{
"summary": (
"While you were away, BotA confided in BotB about a "
"missing file; BotB offered to help quietly investigate."
),
"knowledge_facts": [],
"relationship_summary": "",
}
)
no_threads = json.dumps({"candidates": []})
with open_db(db) as conn:
_seed_meanwhile_scene(conn)
project(conn)
# Order: host POV summary, guest POV summary, digest summary,
# thread detection.
client = MockLLMClient(
canned=[host_canned, guest_canned, digest_canned, no_threads]
)
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=2,
host_bot_id="bot_a",
)
# The meanwhile_digest_pending row was written.
from chat.state.meanwhile import list_pending_meanwhile_digests
pending = list_pending_meanwhile_digests(conn, "chat_bot_a")
assert len(pending) == 1
assert pending[0]["scene_id"] == 2
assert pending[0]["summary"]
assert "missing file" in pending[0]["summary"]
# And the meanwhile_digest_created event was logged.
rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'meanwhile_digest_created'"
).fetchall()
assert len(rows) == 1
payload = json.loads(rows[0][0])
assert payload["chat_id"] == "chat_bot_a"
assert payload["scene_id"] == 2
assert "missing file" in payload["summary"]
def test_pending_digest_renders_in_you_scene_prompt(tmp_path):
"""A pending meanwhile digest (created via direct event append) renders
as a 'Meanwhile while you were away:' SHOULD-tier block in the
assembled you-scene narrative prompt."""
from chat.eventlog.log import append_and_apply
from chat.services.prompt import assemble_narrative_prompt
import chat.state.meanwhile # noqa: F401 -- register handlers
import chat.state.threads # noqa: F401
import chat.state.events # noqa: F401
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
project(conn)
digest_text = (
"While you were away, BotA confided in BotB about a missing file."
)
append_and_apply(
conn,
kind="meanwhile_digest_created",
payload={
"chat_id": "chat_bot_a",
"scene_id": 2,
"summary": digest_text,
},
)
msgs = assemble_narrative_prompt(
conn,
chat_id="chat_bot_a",
speaker_bot_id="bot_a",
recent_dialogue=[],
retrieved_memory_summaries=[],
)
body = msgs[0].content
assert "Meanwhile while you were away:" in body
assert digest_text in body
def test_consumed_digest_does_not_render_again(tmp_path):
"""After meanwhile_digest_consumed lands for a digest, reassembling the
you-scene prompt must NOT include that digest's text — the pending
list is filtered by ``consumed_at IS NULL``."""
from chat.eventlog.log import append_and_apply
from chat.services.prompt import assemble_narrative_prompt
import chat.state.meanwhile # noqa: F401 -- register handlers
import chat.state.threads # noqa: F401
import chat.state.events # noqa: F401
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
project(conn)
digest_text = (
"While you were away, BotA confided in BotB about a missing file."
)
append_and_apply(
conn,
kind="meanwhile_digest_created",
payload={
"chat_id": "chat_bot_a",
"scene_id": 2,
"summary": digest_text,
},
)
# Sanity: it renders before consumption.
msgs = assemble_narrative_prompt(
conn,
chat_id="chat_bot_a",
speaker_bot_id="bot_a",
recent_dialogue=[],
retrieved_memory_summaries=[],
)
assert digest_text in msgs[0].content
# Look up the pending digest id, then consume it.
from chat.state.meanwhile import list_pending_meanwhile_digests
pending = list_pending_meanwhile_digests(conn, "chat_bot_a")
assert len(pending) == 1
digest_id = pending[0]["id"]
append_and_apply(
conn,
kind="meanwhile_digest_consumed",
payload={
"digest_id": digest_id,
"consumed_at": "2026-04-26T20:30:00+00:00",
},
)
msgs2 = assemble_narrative_prompt(
conn,
chat_id="chat_bot_a",
speaker_bot_id="bot_a",
recent_dialogue=[],
retrieved_memory_summaries=[],
)
body2 = msgs2[0].content
assert "Meanwhile while you were away:" not in body2
assert digest_text not in body2
# ---------------------------------------------------------------------------
# T80: scene_summarize polish bundle.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_scene_close_re_run_does_not_double_suffix(tmp_path):
"""T80.1: re-running ``apply_scene_close_summary`` on the same scene
must NOT stack a second "Key quotes:" suffix on each pov_summary. The
builder strips any existing suffix from candidate text before
composing the new one, and the per-POV write replaces (not appends
to) the existing suffix.
"""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA had a heavy talk with you.",
"knowledge_facts": [],
"relationship_summary": "Things shifted.",
}
)
no_threads = json.dumps({"candidates": []})
with open_db(db) as conn:
_seed_single_bot_scene_no_memory(conn)
# Significance >= 2 triggers the Key quotes suffix path.
_seed_memory(conn, pov_summary="Maya quote one", significance=3)
_seed_memory(conn, pov_summary="Maya quote two", significance=2)
project(conn)
# First close.
client = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchall()
assert rows
for (pov,) in rows:
assert pov.count("Key quotes:") == 1
# Second close on the same scene with fresh canned responses.
client2 = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client2,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows2 = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchall()
assert rows2
for (pov,) in rows2:
# Still exactly ONE "Key quotes:" suffix — no recursive bloat.
assert pov.count("Key quotes:") == 1
# And no nested-quote artifacts (the suffix wasn't sourced
# from a row whose text already contained the suffix).
inner_count = pov.count("Key quotes:")
assert inner_count == 1