test: T58 coverage gaps (truncation, update/close paths) (T80.5)

Three gaps left by T58's initial test coverage:

* test_key_quote_truncation_at_200_chars — exercises the 200-char hard
  slice in _build_key_quotes_suffix so any future change to the
  truncation strategy (ellipsis, word boundary, etc) trips the test.
* test_thread_detection_update_candidate_emits_thread_updated —
  pins the ``update`` action emission shape (thread_id, summary,
  last_referenced_scene_id).
* test_thread_detection_close_candidate_emits_thread_closed — pins
  the ``close`` action emission shape (thread_id, closed_at).

No production change; pure coverage add.
This commit is contained in:
Joseph Doherty
2026-04-26 21:50:55 -04:00
parent b91a5e9293
commit 0d3bbf4272
+194
View File
@@ -1749,3 +1749,197 @@ async def test_thread_closed_uses_chat_clock_time(tmp_path, monkeypatch):
payload = json.loads(rows[0][0])
assert payload["thread_id"] == "thr_x"
assert payload["closed_at"] == chat_clock
# ---------------------------------------------------------------------------
# T80.5: T58 coverage gaps (truncation, thread update/close emissions).
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_key_quote_truncation_at_200_chars(tmp_path):
"""T80.5: when a memory's pov_summary exceeds 200 chars, the
Key-quote bullet truncates the source text to exactly 200 chars
(no ellipsis — a hard slice, per the existing T58 implementation)."""
db = tmp_path / "t.db"
apply_migrations(db)
canned = json.dumps(
{
"summary": "BotA had a heavy talk.",
"knowledge_facts": [],
"relationship_summary": "Things shifted.",
}
)
no_threads = json.dumps({"candidates": []})
long_text = "X" * 500 # 500 X's; expected slice is 200 X's.
with open_db(db) as conn:
_seed_single_bot_scene_no_memory(conn)
_seed_memory(conn, pov_summary=long_text, significance=2)
project(conn)
client = MockLLMClient(canned=[canned, no_threads])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
new_pov = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = 1"
).fetchone()[0]
assert "Key quotes:" in new_pov
# The bullet should contain exactly 200 X's, not 500.
# Format from _build_key_quotes_suffix: ``- "<text>"``.
bullet_marker = '- "'
idx = new_pov.index(bullet_marker)
# Count consecutive X's after the bullet marker.
x_run = 0
for ch in new_pov[idx + len(bullet_marker):]:
if ch == "X":
x_run += 1
else:
break
assert x_run == 200, (
f"expected 200-char truncation, got {x_run}"
)
@pytest.mark.asyncio
async def test_thread_detection_update_candidate_emits_thread_updated(
tmp_path, monkeypatch
):
"""T80.5: a detect_threads ``update`` candidate produces a
``thread_updated`` event with the candidate's summary and a
last_referenced_scene_id pointing at the closed scene."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
async def fake_detect_threads(client, **kwargs):
return td_mod.ThreadDetectionResult(
candidates=[
td_mod.ThreadCandidate(
action="update",
existing_thread_id="thr_x",
summary="updated summary",
),
]
)
monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
from chat.eventlog.log import append_and_apply
import chat.state.threads # noqa: F401
# Pre-seed the open thread so the update has a row to target.
append_and_apply(
conn,
kind="thread_opened",
payload={
"thread_id": "thr_x",
"chat_id": "chat_bot_a",
"title": "Lingering question",
"summary": "old summary",
},
)
project(conn)
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'thread_updated'"
).fetchall()
assert len(rows) == 1
payload = json.loads(rows[0][0])
assert payload["thread_id"] == "thr_x"
assert payload["summary"] == "updated summary"
assert payload["last_referenced_scene_id"] == 1
@pytest.mark.asyncio
async def test_thread_detection_close_candidate_emits_thread_closed(
tmp_path, monkeypatch
):
"""T80.5: a detect_threads ``close`` candidate produces a
``thread_closed`` event for the existing thread."""
from chat.services import thread_detection as td_mod
canned = json.dumps(
{
"summary": "BotA had a quick chat.",
"knowledge_facts": [],
"relationship_summary": "Steady.",
}
)
async def fake_detect_threads(client, **kwargs):
return td_mod.ThreadDetectionResult(
candidates=[
td_mod.ThreadCandidate(
action="close",
existing_thread_id="thr_x",
summary="resolved",
),
]
)
monkeypatch.setattr(td_mod, "detect_threads", fake_detect_threads)
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_single_bot_scene(conn)
from chat.eventlog.log import append_and_apply
import chat.state.threads # noqa: F401
append_and_apply(
conn,
kind="thread_opened",
payload={
"thread_id": "thr_x",
"chat_id": "chat_bot_a",
"title": "Lingering question",
"summary": "open",
},
)
project(conn)
client = MockLLMClient(canned=[canned])
await apply_scene_close_summary(
conn,
client,
classifier_model="x",
chat_id="chat_bot_a",
scene_id=1,
host_bot_id="bot_a",
)
rows = conn.execute(
"SELECT payload_json FROM event_log WHERE kind = 'thread_closed'"
).fetchall()
assert len(rows) == 1
payload = json.loads(rows[0][0])
assert payload["thread_id"] == "thr_x"
# closed_at field is present (T80.4 verifies its value).
assert "closed_at" in payload