merge: T74 turn-flow polish + addressee service

This commit is contained in:
Joseph Doherty
2026-04-26 17:43:04 -04:00
4 changed files with 505 additions and 33 deletions
+108
View File
@@ -0,0 +1,108 @@
"""Addressee classifier service (T74.1).
Phase 2 (T44) detected the addressee — host vs. guest — with a simple
case-insensitive whole-word substring match against the bots' names.
That worked for the obvious case ("BotB, what do you think?") but lost
the long tail: pronouns, paraphrases, indirect address, narrative
focus on a particular party. T74.1 swaps the substring helper for a
classifier call that reads the prose holistically.
The substring helper in :mod:`chat.web.turns` is kept as a fast-path
for the no-guest case (only one bot present means there is nothing to
classify) and as a non-breaking fallback for the regenerate path. The
multi-entity branch in :func:`chat.web.turns.post_turn` calls
:func:`detect_addressee` from this module.
Failure mode: classifier flake or low-confidence response degrades to
the host (the default speaker per Phase 2's host-keeps-the-floor
bias). The decision carries ``confidence`` and ``reason`` so callers
that want to log degraded decisions can distinguish a real "host" call
from a fallback.
"""
from __future__ import annotations
from pydantic import BaseModel
from chat.llm.classify import classify
from chat.llm.client import LLMClient
class AddresseeDecision(BaseModel):
"""Which present bot the user is addressing.
``addressee_id`` is the chosen bot's id. ``confidence`` is one of
``"high"`` / ``"medium"`` / ``"low"`` — callers may treat ``"low"``
as a soft fallback to the host. ``reason`` is a short free-form
string. The classifier-failure fallback uses ``reason="fallback"``
so it's distinguishable from a real low-confidence call.
"""
addressee_id: str
confidence: str = "medium" # "high" | "medium" | "low"
reason: str = ""
_SYSTEM = (
"Given a user's turn prose and the names of present bots, decide "
"which bot the user is addressing. If the user is speaking to no "
"specific bot (descriptive narration, action without dialogue), "
"default to the host. Output strict JSON matching the schema. "
"The addressee_id MUST be one of the ids supplied in the user "
"message — do not invent ids."
)
async def detect_addressee(
client: LLMClient,
*,
classifier_model: str,
user_prose: str,
host_id: str,
host_name: str,
guest_id: str | None,
guest_name: str | None,
timeout_s: float = 30.0,
) -> AddresseeDecision:
"""Classify which present bot the user is addressing.
Defaults to host on classifier failure or when the classifier picks
an id that isn't one of the supplied ids. The caller is expected to
only invoke this in the multi-entity case (a guest is present);
when no guest is present the substring fast-path in
:mod:`chat.web.turns` is used instead and this function is not
called.
"""
fallback = AddresseeDecision(
addressee_id=host_id, confidence="low", reason="fallback"
)
user = (
f"Host: {host_name} (id={host_id})\n"
+ (
f"Guest: {guest_name} (id={guest_id})\n"
if guest_id is not None
else ""
)
+ f"\nUser prose:\n{user_prose}"
)
decision = await classify(
client,
model=classifier_model,
system=_SYSTEM,
user=user,
schema=AddresseeDecision,
default=fallback,
timeout_s=timeout_s,
)
# Defensive: if the classifier returned an id outside the supplied
# set, treat it as a fallback to the host. This catches pathological
# outputs that pass schema validation but pick a phantom id.
valid_ids = {host_id}
if guest_id is not None:
valid_ids.add(guest_id)
if decision.addressee_id not in valid_ids:
return fallback
return decision
__all__ = ["AddresseeDecision", "detect_addressee"]
+62 -7
View File
@@ -55,6 +55,7 @@ from fastapi import APIRouter, Depends, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from chat.eventlog.log import append_and_apply, append_event
from chat.services.addressee import detect_addressee
from chat.services.background import SignificanceJob
from chat.services.interjection import detect_interjection
from chat.services.memory_write import record_turn_memory_for_present
@@ -235,11 +236,12 @@ async def post_turn(
guest_bot = None
guest_bot_id = chat.get("guest_bot_id")
if guest_bot_id is not None:
# T47's bot_reset cascade clears guest_bot_id from any chat that
# referenced the deleted bot, so by the time we read it here it's
# either None or a live bot id. The previous defensive
# degrade-to-1:1 block (T44) was rendered dead by T47 and removed
# in T74.4 — get_bot now returns a real row.
guest_bot = get_bot(conn, guest_bot_id)
# If the chat references a deleted guest we degrade to single-bot
# rather than 404 — the chat is still usable as a 1:1.
if guest_bot is None:
guest_bot_id = None
settings = request.app.state.settings
@@ -262,8 +264,25 @@ async def post_turn(
# 3. Determine the addressee. Done before assistant_turn_started so the
# placeholder reflects the bot the user is actually talking to (host
# in 1:1, host-or-guest in multi-entity).
addressee_id = _detect_addressee_id(prose, host_bot, guest_bot)
# in 1:1, host-or-guest in multi-entity). T74.1 routes the multi-entity
# case through the addressee classifier; the no-guest case still uses
# the substring fast-path because there is nothing to classify when
# only one bot is present (and a classifier round-trip there would
# just be throughput overhead).
if guest_bot is None:
addressee_id = _detect_addressee_id(prose, host_bot, guest_bot)
else:
decision = await detect_addressee(
client,
classifier_model=settings.classifier_model,
user_prose=prose,
host_id=host_bot["id"],
host_name=host_bot["name"],
guest_id=guest_bot["id"],
guest_name=guest_bot["name"],
timeout_s=settings.classifier_timeout_s,
)
addressee_id = decision.addressee_id
addressee_bot = (
guest_bot if (guest_bot is not None and addressee_id == guest_bot["id"])
else host_bot
@@ -598,7 +617,7 @@ async def post_turn(
# Memory write for the interjection beat — a second pair
# of memory_written events (host + guest POVs).
record_turn_memory_for_present(
interject_memory_results = record_turn_memory_for_present(
conn,
chat_id=chat_id,
host_bot_id=host_bot["id"],
@@ -608,6 +627,33 @@ async def post_turn(
chat_clock_at=chat.get("time"),
)
# T74.2: enqueue a significance pass for the interjection
# memory. Mirrors the primary-turn enqueue pattern above —
# we score on the host's memory id since the prose is
# identical across both POVs (per-POV rewrite happens at
# scene close in T45). Without this enqueue the
# interjection beat lands in memory but never gets scored,
# so it can never auto-pin even when it carries a pivotal
# moment.
interject_host_event = interject_memory_results.get(
host_bot["id"]
)
interject_host_memory_id = (
interject_host_event[1] if interject_host_event else None
)
if (
worker is not None
and interject_host_memory_id is not None
):
worker.enqueue(
SignificanceJob(
memory_id=interject_host_memory_id,
narrative_text=interjection_text,
prior_dialogue=recent_post_interject,
host_bot_id=host_bot["id"],
)
)
# 9. Scene-close detection (Plan §7.2, T26). Runs AFTER assistant_turn
# and the optional interjection so the bots' responses are part of
# the closing scene's final beat — closing before narrative would
@@ -623,6 +669,15 @@ async def post_turn(
# close in the same chat) — we have nothing to close. T13 (kickoff)
# is the only scene-opener path in v1; Phase 2-3 will handle
# automatic re-opening with the next container.
#
# T74.3: this branch deliberately runs even when ``cancelled`` is
# True. Close detection consumes only the user's prose (which is
# fully appended to the event_log BEFORE streaming starts) and the
# current container name; it does NOT consume the bot's output.
# A user who types "we're done here, fade out" and then hits Stop
# mid-stream still meant to close the scene — the cancelled bot
# beat doesn't invalidate that intent. Pinned by
# test_cancelled_turn_still_closes_scene_when_user_prose_signals_close.
if scene is not None and prose.strip():
container = None
if scene.get("container_id") is not None:
+99
View File
@@ -0,0 +1,99 @@
"""Addressee classifier service tests (T74.1).
Covers :func:`chat.services.addressee.detect_addressee`:
- Classifier picks the guest -> ``addressee_id == guest_id``.
- Classifier picks the host -> ``addressee_id == host_id``.
- Classifier flakes (3 bad-JSON responses, exhausting the built-in
retry budget in :func:`chat.llm.classify.classify`) -> fallback to
the host with ``reason="fallback"``.
"""
from __future__ import annotations
import json
import pytest
from chat.llm.mock import MockLLMClient
from chat.services.addressee import AddresseeDecision, detect_addressee
@pytest.mark.asyncio
async def test_classifier_picks_guest():
"""Classifier returns the guest id verbatim — caller propagates it."""
canned = [
json.dumps(
{
"addressee_id": "bot_b",
"confidence": "high",
"reason": "user named BotB",
}
)
]
client = MockLLMClient(canned=canned)
result = await detect_addressee(
client,
classifier_model="test-model",
user_prose="BotB, what do you think?",
host_id="bot_a",
host_name="BotA",
guest_id="bot_b",
guest_name="BotB",
)
assert isinstance(result, AddresseeDecision)
assert result.addressee_id == "bot_b"
assert result.confidence == "high"
@pytest.mark.asyncio
async def test_classifier_picks_host():
"""Classifier returns the host id — caller propagates it."""
canned = [
json.dumps(
{
"addressee_id": "bot_a",
"confidence": "medium",
"reason": "narration aimed at host",
}
)
]
client = MockLLMClient(canned=canned)
result = await detect_addressee(
client,
classifier_model="test-model",
user_prose="I lean back and stretch.",
host_id="bot_a",
host_name="BotA",
guest_id="bot_b",
guest_name="BotB",
)
assert result.addressee_id == "bot_a"
assert result.confidence == "medium"
@pytest.mark.asyncio
async def test_classifier_failure_falls_back_to_host():
"""Three bad-JSON responses exhaust the retry budget and the
classifier-failure fallback returns ``host_id`` with
``reason="fallback"``."""
canned = ["not json", "still not json", "garbage"]
client = MockLLMClient(canned=canned)
result = await detect_addressee(
client,
classifier_model="test-model",
user_prose="anything",
host_id="bot_a",
host_name="BotA",
guest_id="bot_b",
guest_name="BotB",
)
assert result.addressee_id == "bot_a"
assert result.reason == "fallback"
assert result.confidence == "low"
+236 -26
View File
@@ -405,14 +405,15 @@ def test_multi_bot_turn_no_interjection(app_state_setup, tmp_path):
1 user_turn + 1 assistant_turn + 6 *post-turn* edge_updates + 2
memory_written events. Single turn_html broadcast.
Canned queue (8 calls):
Canned queue (11 calls):
1. parse_turn
2. narrative stream (primary, addressee = host because the prose
2. detect_addressee (T74.1) -> host
3. narrative stream (primary, addressee = host because the prose
doesn't name the guest)
3-8. 6 state-update calls (one per directed pair across {you,
4-9. 6 state-update calls (one per directed pair across {you,
bot_a, bot_b})
9. detect_interjection -> should_interject=False
10. detect_scene_close -> should_close=False
10. detect_interjection -> should_interject=False
11. detect_scene_close -> should_close=False
"""
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
@@ -420,6 +421,9 @@ def test_multi_bot_turn_no_interjection(app_state_setup, tmp_path):
)
canned = [
canned_parse,
json.dumps(
{"addressee_id": "bot_a", "confidence": "medium", "reason": "host"}
),
"Greetings.",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
@@ -474,14 +478,15 @@ def test_multi_bot_turn_with_interjection(app_state_setup, tmp_path):
1 user_turn + 2 assistant_turns + (6 + 6) post-turn edge_updates +
4 memory_written events.
Canned queue (16 calls):
Canned queue (17 calls):
1. parse_turn
2. narrative stream (primary)
3-8. 6 state-update calls (post-primary)
9. detect_interjection -> should_interject=True
10. narrative stream (interjection)
11-16. 6 state-update calls (post-interjection)
17. detect_scene_close -> should_close=False
2. detect_addressee (T74.1) -> host
3. narrative stream (primary)
4-9. 6 state-update calls (post-primary)
10. detect_interjection -> should_interject=True
11. narrative stream (interjection)
12-17. 6 state-update calls (post-interjection)
18. detect_scene_close -> should_close=False
"""
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
@@ -489,6 +494,9 @@ def test_multi_bot_turn_with_interjection(app_state_setup, tmp_path):
)
canned = [
canned_parse,
json.dumps(
{"addressee_id": "bot_a", "confidence": "medium", "reason": "host"}
),
"Primary beat.",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
@@ -555,14 +563,15 @@ def test_multi_bot_turn_scene_close_writes_per_pov_summaries(
rewrites fire for both bots (memory.pov_summary changes for each).
Interjection short-circuits at False so the queue stays compact.
Canned queue (12 calls):
Canned queue (13 calls):
1. parse_turn
2. narrative stream (primary)
3-8. 6 state-update calls
9. detect_interjection -> False (no follow-on stream)
10. detect_scene_close -> True
11. apply_scene_close_summary host POV
12. apply_scene_close_summary guest POV
2. detect_addressee (T74.1) -> host
3. narrative stream (primary)
4-9. 6 state-update calls
10. detect_interjection -> False (no follow-on stream)
11. detect_scene_close -> True
12. apply_scene_close_summary host POV
13. apply_scene_close_summary guest POV
"""
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
@@ -588,6 +597,9 @@ def test_multi_bot_turn_scene_close_writes_per_pov_summaries(
)
canned = [
canned_parse,
json.dumps(
{"addressee_id": "bot_a", "confidence": "medium", "reason": "host"}
),
"Goodnight.",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
@@ -639,12 +651,20 @@ def test_multi_bot_turn_scene_close_writes_per_pov_summaries(
def test_addressee_detection_routes_to_named_bot(app_state_setup, tmp_path):
"""Prose that names the guest by name routes the primary turn to the
guest. Interjection (when fired) makes the host the silent witness
and the second assistant_turn carries the host as speaker.
"""T74.1: the multi-entity addressee call goes through the classifier;
when the classifier returns the guest, the primary turn routes there.
Interjection (when fired) makes the host the silent witness and the
second assistant_turn carries the host as speaker.
Canned queue: same shape as the with-interjection test (16 calls)
plus the trailing scene_close decision.
Canned queue (with classifier-led addressee = guest):
1. parse_turn
2. detect_addressee -> bot_b (the guest)
3. narrative stream (primary, addressee = guest)
4-9. 6 state-update calls
10. detect_interjection -> True
11. interjection narrative stream
12-17. 6 state-update calls (post-interjection)
18. detect_scene_close -> False
"""
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
@@ -652,6 +672,13 @@ def test_addressee_detection_routes_to_named_bot(app_state_setup, tmp_path):
)
canned = [
canned_parse,
json.dumps(
{
"addressee_id": "bot_b",
"confidence": "high",
"reason": "user named BotB",
}
),
"BotB pondering.",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
@@ -680,9 +707,192 @@ def test_addressee_detection_routes_to_named_bot(app_state_setup, tmp_path):
primary_payload = json.loads(rows[0][0])
interjection_payload = json.loads(rows[1][0])
# Primary speaker is the guest because the prose names BotB and not
# BotA (case-insensitive whole-word match).
# Primary speaker is the guest because the addressee classifier
# picked bot_b for the prose ("BotB, what do you think?").
assert primary_payload["speaker_id"] == "bot_b"
# Interjection follow-on goes to the silent witness — the host.
assert interjection_payload["speaker_id"] == "bot_a"
assert interjection_payload["interjection_of"] == "bot_b"
def test_cancelled_turn_still_closes_scene_when_user_prose_signals_close(
app_state_setup, tmp_path
):
"""T74.3 regression: a cancelled primary stream still triggers scene
close when the user prose carries a hard close signal.
Rationale (also documented in turns.py near the close-detection
branch): close detection only consumes the user's prose, which is
fully appended to the event_log BEFORE streaming starts. The
cancelled bot beat doesn't invalidate the user's intent to close.
Implementation: install a MockLLMClient whose ``stream`` raises
CancelledError on the first iteration. The classifier calls (parse,
addressee, scene_close, per-POV summaries) are still served from
the canned queue. The post_turn route ultimately re-raises
CancelledError after recording the partial — TestClient surfaces
that as an exception, so we drive the request inside ``with
pytest.raises``. Despite the exception, the scene_closed event
must land in the event_log.
"""
from typing import AsyncIterator, Sequence
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "narration", "text": "we are done here, fade out"}]}
)
pov_payload = json.dumps(
{
"summary": "BotA noticed the day winding down.",
"knowledge_facts": [],
"relationship_summary": "warmer",
}
)
pov_payload_guest = json.dumps(
{
"summary": "BotB watched the scene close.",
"knowledge_facts": [],
"relationship_summary": "warmer",
}
)
# Canned queue: parse + addressee + 6 state-updates +
# scene_close=True + 2 per-POV summaries. NO interjection slot
# because the cancel path short-circuits the interjection branch.
canned = [
canned_parse,
json.dumps(
{"addressee_id": "bot_a", "confidence": "medium", "reason": "host"}
),
# NOTE: no narrative slot — the stream is hijacked below to
# raise CancelledError on first iteration; it never pulls a
# canned response.
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_close": True, "reason": "fade out signaled"}),
pov_payload,
pov_payload_guest,
]
class _CancelOnStreamMock:
"""Mock LLM client that serves ``generate`` from a canned queue
and raises CancelledError on the FIRST iteration of ``stream``.
Mirrors :class:`chat.llm.mock.MockLLMClient` for ``generate`` but
diverges on ``stream`` to simulate a mid-stream cancel.
"""
def __init__(self, canned: list[str]) -> None:
self._canned = list(canned)
async def generate(
self, messages: Sequence, *, model: str, **params
) -> str:
return self._canned.pop(0)
async def stream(
self, messages: Sequence, *, model: str, **params
) -> AsyncIterator[str]:
# Yield a CancelledError on first iteration to simulate the
# /turns/cancel route firing mid-stream.
raise asyncio.CancelledError
yield # pragma: no cover — keeps this an async generator.
from chat.web.kickoff import get_llm_client
mock = _CancelOnStreamMock(canned=list(canned))
app.dependency_overrides[get_llm_client] = lambda: mock
try:
# FastAPI/Starlette handles the re-raised CancelledError as an
# internal failure — TestClient surfaces it as a 500 response.
# We don't assert on the status here; the regression is whether
# the scene_closed event still landed in the event_log.
try:
app_state_setup.post(
"/chats/chat_bot_a/turns",
data={"prose": "we are done here, fade out"},
)
except BaseException:
# Some Starlette/asyncio versions propagate the
# CancelledError out of the test client; that's fine — the
# partial-record + scene-close still ran before the raise.
pass
finally:
app.dependency_overrides.clear()
with open_db(tmp_path / "test.db") as conn:
scene_close_count = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'scene_closed'"
).fetchone()[0]
assistant_payload = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'assistant_turn' ORDER BY id"
).fetchall()
# Scene close lands despite the cancel.
assert scene_close_count == 1
# The cancelled assistant_turn was still recorded (truncated=True).
assert len(assistant_payload) == 1
assert json.loads(assistant_payload[0][0])["truncated"] is True
def test_interjection_enqueues_significance_job(app_state_setup, tmp_path):
"""T74.2: when an interjection fires, the interjection memory is
enqueued for significance scoring just like the primary memory.
Capture enqueued ``SignificanceJob``s by replacing the background
worker's ``enqueue`` method with a list-append. Without T74.2, the
interjection memory would never be scored — only the primary's
enqueue would land. We therefore expect TWO jobs after a turn that
has both a primary and an interjection beat: one for the primary
memory, one for the interjection memory.
"""
_seed_chat_with_guest(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "tell me"}]}
)
canned = [
canned_parse,
json.dumps(
{"addressee_id": "bot_a", "confidence": "medium", "reason": "host"}
),
"Primary beat.",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_interject": True, "reason": "jealous"}),
"Interjection beat!",
_zero_state(), _zero_state(), _zero_state(),
_zero_state(), _zero_state(), _zero_state(),
json.dumps({"should_close": False, "reason": "no signal"}),
]
_override_llm(canned)
captured_jobs: list = []
worker = app.state.background_worker
# Re-enable enqueue capture even though the worker's loop is disabled
# — we want to count enqueues without the loop running classifier work.
worker.enabled = True
original_enqueue = worker.enqueue
worker.enqueue = captured_jobs.append # type: ignore[assignment]
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "tell me"}
)
assert response.status_code == 204
finally:
worker.enqueue = original_enqueue # type: ignore[assignment]
worker.enabled = False
app.dependency_overrides.clear()
# Expect 2 enqueues: 1 for the primary memory + 1 for the
# interjection memory.
assert len(captured_jobs) == 2
# Both jobs should reference distinct memory ids — the primary's
# host-POV memory and the interjection's host-POV memory.
memory_ids = [job.memory_id for job in captured_jobs]
assert len(set(memory_ids)) == 2
# The two narrative texts should be the two streamed beats.
narrative_texts = sorted(job.narrative_text for job in captured_jobs)
assert narrative_texts == ["Interjection beat!", "Primary beat."]