feat: T44 multi-entity turn flow with interjection support
Rewrites post_turn for the multi-entity world: - Addressee detection via case-insensitive whole-word match against the guest name; defaults to host on no-match or both-match. - Multi-entity prompt assembly: forwards guest_id so the prompt sees the third party's activity / edges / group-node. - Multi-witness memory write: record_turn_memory_for_present writes one memory per present bot witness when a guest is in the room. - Multi-pair state-update: compute_state_updates_for_present emits one edge_update per directed pair (6 with a guest, 2 without). - Interjection branch (T39): when a guest is present and the primary beat completes, the silent witness may follow on. detect_interjection decides; on True we stream a second narrative as the witness, append a second assistant_turn linked to the same user_turn_id, and re-run the multi-pair state update + memory write for the follow-on beat. Cancel collapses both halves; a cancelled interjection skips its downstream passes so we don't classifier-spam against a half-formed beat. - Scene-close runs after both beats so apply_scene_close_summary sees the full closing scene; T45's guest-aware summarizer handles per-POV rewrites for each present witness. regenerate.py mirrors the prompt / memory / state-update changes for 1:1 and multi-entity scenes. Per the Phase 2 spec, interjection regeneration is deferred to Phase 2.5 — regenerate only re-streams the addressee turn for v2. Tests: adds 5 cases to tests/test_turn_flow.py covering the no-guest regression, multi-bot without interjection, multi-bot with interjection, scene-close per-POV rewrites, and addressee routing on a named-bot prose. Each test pins its own canned MockLLMClient queue with the call shape documented in the docstring.
This commit is contained in:
+110
-72
@@ -26,6 +26,22 @@ Phase 1 simplifications (per the plan's "bound it" guidance):
|
||||
so affinity/trust/knowledge reflect the new output.
|
||||
- The route does not broadcast a fresh ``turn_html`` SSE event; T34
|
||||
polishes UI swaps. The user refreshes the page to see the new turn.
|
||||
|
||||
Phase 2 changes (T44):
|
||||
|
||||
- Multi-entity prompt assembly: ``guest_id`` is forwarded to the
|
||||
prompt assembler so the regenerated narrative sees the same
|
||||
guest-aware context the original turn did.
|
||||
- Multi-witness memory write: ``record_turn_memory_for_present`` fans
|
||||
out one ``memory_written`` event per witness when a guest is present.
|
||||
- Multi-pair state-update: ``compute_state_updates_for_present`` emits
|
||||
one ``edge_update`` per directed pair across present entities. With
|
||||
three present that's six edges instead of two.
|
||||
- Interjection regeneration is **deferred to Phase 2.5**. Regenerate
|
||||
only re-streams the addressee turn for v2; ``detect_interjection``
|
||||
is not invoked here. If the prior turn fired an interjection it
|
||||
remains attached to the original assistant_turn (which is superseded
|
||||
alongside the regenerated turn) — Phase 2.5 will revisit.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -35,9 +51,9 @@ from sqlite3 import Connection
|
||||
|
||||
from chat.config import Settings
|
||||
from chat.eventlog.log import append_and_apply, append_event
|
||||
from chat.services.memory_write import record_turn_memory
|
||||
from chat.services.memory_write import record_turn_memory_for_present
|
||||
from chat.services.multi_state_update import compute_state_updates_for_present
|
||||
from chat.services.prompt import assemble_narrative_prompt
|
||||
from chat.services.state_update import compute_state_update
|
||||
from chat.state.edges import get_edge
|
||||
from chat.state.entities import get_bot, get_you
|
||||
from chat.state.world import active_scene, get_chat
|
||||
@@ -72,6 +88,16 @@ async def regenerate_assistant_turn(
|
||||
"persona": "",
|
||||
}
|
||||
|
||||
# Phase 2: surface the guest (if any) so the prompt assembler and
|
||||
# downstream multi-entity passes see the same shape post_turn does.
|
||||
guest_bot_id = chat.get("guest_bot_id")
|
||||
guest_bot: dict | None = None
|
||||
if guest_bot_id is not None:
|
||||
guest_bot = get_bot(conn, guest_bot_id)
|
||||
if guest_bot is None:
|
||||
# Stale guest reference — degrade to single-bot regenerate.
|
||||
guest_bot_id = None
|
||||
|
||||
# 1. Locate the original assistant_turn event.
|
||||
row = conn.execute(
|
||||
"SELECT payload_json FROM event_log "
|
||||
@@ -82,6 +108,17 @@ async def regenerate_assistant_turn(
|
||||
raise ValueError("assistant_turn event not found")
|
||||
original_assistant_payload = json.loads(row[0])
|
||||
original_user_turn_id = original_assistant_payload.get("user_turn_id")
|
||||
# Phase 2 v2 regenerates only the addressee turn — preserve whichever
|
||||
# bot the original turn was attributed to, falling back to the host
|
||||
# for legacy rows that pre-date multi-entity support.
|
||||
speaker_bot_id = original_assistant_payload.get("speaker_id") or host_bot_id
|
||||
if speaker_bot_id == host_bot_id:
|
||||
speaker_bot = host_bot
|
||||
elif guest_bot is not None and speaker_bot_id == guest_bot.get("id"):
|
||||
speaker_bot = guest_bot
|
||||
else:
|
||||
speaker_bot = get_bot(conn, speaker_bot_id) or host_bot
|
||||
speaker_bot_id = speaker_bot.get("id", host_bot_id)
|
||||
|
||||
# 2. Determine the prose for the new prompt and (when edited) capture
|
||||
# the user_turn_edit event up front so the new event ids exist before
|
||||
@@ -137,20 +174,26 @@ async def regenerate_assistant_turn(
|
||||
if kind in ("user_turn", "user_turn_edit"):
|
||||
recent.append({"speaker": you_name, "text": p.get("prose", "")})
|
||||
else:
|
||||
recent.append(
|
||||
{"speaker": host_bot.get("name", "bot"), "text": p.get("text", "")}
|
||||
)
|
||||
spk = p.get("speaker_id", "bot")
|
||||
spk_name = host_bot.get("name", "bot")
|
||||
if spk == host_bot_id:
|
||||
spk_name = host_bot.get("name", "bot")
|
||||
elif guest_bot is not None and spk == guest_bot.get("id"):
|
||||
spk_name = guest_bot.get("name", "bot")
|
||||
recent.append({"speaker": spk_name, "text": p.get("text", "")})
|
||||
|
||||
# 4. Assemble the narrative prompt. ``recent`` already excludes the
|
||||
# current user prose, which we pass through ``user_turn_prose``.
|
||||
# Phase 2: forward ``guest_id`` so the prompt sees the third party.
|
||||
messages = assemble_narrative_prompt(
|
||||
conn,
|
||||
chat_id=chat_id,
|
||||
speaker_bot_id=host_bot_id,
|
||||
speaker_bot_id=speaker_bot_id,
|
||||
user_turn_prose=prose_for_prompt or None,
|
||||
recent_dialogue=recent,
|
||||
budget_soft=settings.narrative_budget_soft,
|
||||
budget_hard=settings.narrative_budget_hard,
|
||||
guest_id=guest_bot_id,
|
||||
)
|
||||
|
||||
# 5. Stream the new narrative.
|
||||
@@ -164,7 +207,7 @@ async def regenerate_assistant_turn(
|
||||
accumulated.append(chunk)
|
||||
await publish(
|
||||
chat_id,
|
||||
{"event": "token", "text": chunk, "speaker_id": host_bot_id},
|
||||
{"event": "token", "text": chunk, "speaker_id": speaker_bot_id},
|
||||
)
|
||||
new_text = "".join(accumulated)
|
||||
|
||||
@@ -177,7 +220,7 @@ async def regenerate_assistant_turn(
|
||||
kind="assistant_turn",
|
||||
payload={
|
||||
"chat_id": chat_id,
|
||||
"speaker_id": host_bot_id,
|
||||
"speaker_id": speaker_bot_id,
|
||||
"text": new_text,
|
||||
"truncated": False,
|
||||
"user_turn_id": (
|
||||
@@ -196,88 +239,83 @@ async def regenerate_assistant_turn(
|
||||
)
|
||||
|
||||
# 8. Re-run downstream classifier passes (memory write + state update
|
||||
# for both directed edges). Significance is intentionally skipped on
|
||||
# regenerate (the prior score remains attached to the prior memory).
|
||||
# for every directed pair across present entities). Significance is
|
||||
# intentionally skipped on regenerate (the prior score remains
|
||||
# attached to the prior memory). Phase 2.5 will add interjection
|
||||
# regeneration; v2 leaves any prior interjection beat in place.
|
||||
scene = active_scene(conn, chat_id)
|
||||
record_turn_memory(
|
||||
record_turn_memory_for_present(
|
||||
conn,
|
||||
chat_id=chat_id,
|
||||
host_bot_id=host_bot_id,
|
||||
guest_bot_id=guest_bot_id,
|
||||
narrative_text=new_text,
|
||||
scene_id=scene["id"] if scene else None,
|
||||
chat_clock_at=chat.get("time"),
|
||||
)
|
||||
|
||||
last_at = chat.get("time")
|
||||
speaker_name = (
|
||||
speaker_bot.get("name", "bot") if speaker_bot is not None else "bot"
|
||||
)
|
||||
recent_for_update = recent + [
|
||||
{"speaker": host_bot.get("name", "bot"), "text": new_text}
|
||||
{"speaker": speaker_name, "text": new_text}
|
||||
]
|
||||
|
||||
edge_b2y = get_edge(conn, host_bot_id, "you") or {
|
||||
"affinity": 50,
|
||||
"trust": 50,
|
||||
"summary": "",
|
||||
# Build present-entity inputs for the multi-pair state-update pass.
|
||||
# Host first preserves the Phase 1 directed-pair order (host->you,
|
||||
# then you->host) so existing canned-response fixtures still line up.
|
||||
present_ids: list[str] = [host_bot_id, "you"]
|
||||
present_names: dict[str, str] = {
|
||||
host_bot_id: host_bot.get("name", "bot"),
|
||||
"you": you_name,
|
||||
}
|
||||
update_b2y = await compute_state_update(
|
||||
client,
|
||||
model=settings.classifier_model,
|
||||
source_id=host_bot_id,
|
||||
target_id="you",
|
||||
source_name=host_bot.get("name", "bot"),
|
||||
source_persona=host_bot.get("persona", "") or "",
|
||||
target_name=you_name,
|
||||
prior_affinity=edge_b2y["affinity"],
|
||||
prior_trust=edge_b2y["trust"],
|
||||
prior_summary=edge_b2y.get("summary", "") or "",
|
||||
recent_dialogue=recent_for_update,
|
||||
)
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="edge_update",
|
||||
payload={
|
||||
"source_id": host_bot_id,
|
||||
"target_id": "you",
|
||||
"chat_id": chat_id,
|
||||
"affinity_delta": update_b2y.affinity_delta,
|
||||
"trust_delta": update_b2y.trust_delta,
|
||||
"knowledge_facts": update_b2y.knowledge_facts,
|
||||
"last_interaction_at": last_at,
|
||||
"last_interaction_chat_id": chat_id,
|
||||
},
|
||||
)
|
||||
personas: dict[str, str] = {
|
||||
host_bot_id: host_bot.get("persona") or "",
|
||||
"you": you_entity.get("persona") or "",
|
||||
}
|
||||
if guest_bot is not None and guest_bot_id is not None:
|
||||
present_ids.append(guest_bot_id)
|
||||
present_names[guest_bot_id] = guest_bot.get("name", "bot")
|
||||
personas[guest_bot_id] = guest_bot.get("persona") or ""
|
||||
|
||||
edge_y2b = get_edge(conn, "you", host_bot_id) or {
|
||||
"affinity": 50,
|
||||
"trust": 50,
|
||||
"summary": "",
|
||||
}
|
||||
update_y2b = await compute_state_update(
|
||||
prior_edges: dict[tuple[str, str], dict] = {}
|
||||
for src in present_ids:
|
||||
for tgt in present_ids:
|
||||
if src == tgt:
|
||||
continue
|
||||
edge = get_edge(conn, src, tgt) or {
|
||||
"affinity": 50,
|
||||
"trust": 50,
|
||||
"summary": "",
|
||||
}
|
||||
prior_edges[(src, tgt)] = edge
|
||||
|
||||
state_updates = await compute_state_updates_for_present(
|
||||
client,
|
||||
model=settings.classifier_model,
|
||||
source_id="you",
|
||||
target_id=host_bot_id,
|
||||
source_name=you_name,
|
||||
source_persona=you_entity.get("persona", "") or "",
|
||||
target_name=host_bot.get("name", "bot"),
|
||||
prior_affinity=edge_y2b["affinity"],
|
||||
prior_trust=edge_y2b["trust"],
|
||||
prior_summary=edge_y2b.get("summary", "") or "",
|
||||
classifier_model=settings.classifier_model,
|
||||
present_ids=present_ids,
|
||||
present_names=present_names,
|
||||
personas=personas,
|
||||
prior_edges=prior_edges,
|
||||
recent_dialogue=recent_for_update,
|
||||
timeout_s=settings.classifier_timeout_s,
|
||||
)
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="edge_update",
|
||||
payload={
|
||||
"source_id": "you",
|
||||
"target_id": host_bot_id,
|
||||
"chat_id": chat_id,
|
||||
"affinity_delta": update_y2b.affinity_delta,
|
||||
"trust_delta": update_y2b.trust_delta,
|
||||
"knowledge_facts": update_y2b.knowledge_facts,
|
||||
"last_interaction_at": last_at,
|
||||
"last_interaction_chat_id": chat_id,
|
||||
},
|
||||
)
|
||||
for src_id, tgt_id, update in state_updates:
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="edge_update",
|
||||
payload={
|
||||
"source_id": src_id,
|
||||
"target_id": tgt_id,
|
||||
"chat_id": chat_id,
|
||||
"affinity_delta": update.affinity_delta,
|
||||
"trust_delta": update.trust_delta,
|
||||
"knowledge_facts": update.knowledge_facts,
|
||||
"last_interaction_at": last_at,
|
||||
"last_interaction_chat_id": chat_id,
|
||||
},
|
||||
)
|
||||
|
||||
return new_text
|
||||
|
||||
|
||||
+418
-137
@@ -1,32 +1,47 @@
|
||||
"""POST ``/chats/<id>/turns`` — narrative turn flow with SSE streaming.
|
||||
|
||||
The turn flow strings together the pieces built in T17 (turn parser), T18
|
||||
(prompt assembler), and T16 (SSE channel):
|
||||
(prompt assembler), and T16 (SSE channel). Phase 2 (T44) extends it to
|
||||
multi-entity scenes with optional guest support and a follow-on
|
||||
interjection beat.
|
||||
|
||||
1. Parse the user's prose with the classifier into typed segments.
|
||||
2. Append a ``user_turn`` event capturing both the original prose and the
|
||||
parsed segments.
|
||||
3. Append a placeholder ``assistant_turn_started`` marker so observers know
|
||||
a response is in flight.
|
||||
4. Build the narrative prompt, dropping OOC segments before they reach the
|
||||
bot (per Requirements §6.1 the OOC convention is for the author to talk
|
||||
to the system, not to the in-fiction bot).
|
||||
5. Stream tokens from the LLM, broadcasting each chunk over the chat's SSE
|
||||
4. Detect the addressee (host vs. guest) from the prose using a simple
|
||||
word-boundary substring match — see :func:`_detect_addressee_id`.
|
||||
5. Build the narrative prompt for the addressee, dropping OOC segments
|
||||
before they reach the bot (per Requirements §6.1 the OOC convention is
|
||||
for the author to talk to the system, not to the in-fiction bot).
|
||||
6. Stream tokens from the LLM, broadcasting each chunk over the chat's SSE
|
||||
channel as a ``token`` event so any subscribed browser tab sees them
|
||||
arrive in real time.
|
||||
6. On stream complete, append an ``assistant_turn`` event with the full
|
||||
7. On stream complete, append an ``assistant_turn`` event with the full
|
||||
text and ``truncated=False``. Then run a post-turn state-update pass
|
||||
(Requirements §3.4): one classifier call per directed edge between
|
||||
present entities, each producing an ``edge_update`` event with
|
||||
affinity/trust/knowledge deltas. Finally publish a ``turn_html``
|
||||
event with a ready-to-swap HTML fragment so HTMX's SSE extension can
|
||||
append it to the timeline without a page reload.
|
||||
7. Return ``204 No Content`` — the SSE channel is the real conveyor of
|
||||
state, not the POST response body.
|
||||
affinity/trust/knowledge deltas.
|
||||
8. When a guest is present, run the interjection classifier (§6.2). If it
|
||||
fires we stream a second narrative as the silent witness, append a
|
||||
second ``assistant_turn`` event linked to the same ``user_turn_id``,
|
||||
and re-run memory + state-update for the interjector. The same
|
||||
in-flight task covers both halves so cancel collapses both.
|
||||
9. Scene-close detection runs after the (primary + optional interjection)
|
||||
beats land so the close summary sees the full closing scene. T45's
|
||||
guest-aware ``apply_scene_close_summary`` writes per-POV summaries for
|
||||
each present witness.
|
||||
10. Publish a ``turn_html`` event for each turn so HTMX's SSE extension
|
||||
can append it to the timeline without a page reload.
|
||||
11. Return ``204 No Content`` — the SSE channel is the real conveyor of
|
||||
state, not the POST response body.
|
||||
|
||||
Errors during streaming flip the assistant_turn's ``truncated`` flag to
|
||||
``True`` and we still commit what we received. ``asyncio.CancelledError``
|
||||
is treated identically and re-raised after recording the partial turn.
|
||||
A cancellation mid-interjection skips the interjector's state/memory
|
||||
follow-up so we don't run classifiers against a half-formed beat.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -34,18 +49,20 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import html
|
||||
import json
|
||||
import re
|
||||
|
||||
from fastapi import APIRouter, Depends, Form, HTTPException, Request
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse, Response
|
||||
|
||||
from chat.eventlog.log import append_and_apply, append_event
|
||||
from chat.services.background import SignificanceJob
|
||||
from chat.services.memory_write import record_turn_memory
|
||||
from chat.services.interjection import detect_interjection
|
||||
from chat.services.memory_write import record_turn_memory_for_present
|
||||
from chat.services.multi_state_update import compute_state_updates_for_present
|
||||
from chat.services.prompt import assemble_narrative_prompt
|
||||
from chat.services.rewind import compute_rewind_preview, execute_rewind
|
||||
from chat.services.scene_close import detect_scene_close
|
||||
from chat.services.scene_summarize import apply_scene_close_summary
|
||||
from chat.services.state_update import compute_state_update
|
||||
from chat.services.turn_parse import ParsedTurn, parse_turn
|
||||
from chat.state.edges import get_edge
|
||||
from chat.state.entities import get_bot, get_you
|
||||
@@ -114,6 +131,84 @@ def _read_recent_dialogue(conn, chat_id: str, limit: int = 200) -> list[dict]:
|
||||
return out
|
||||
|
||||
|
||||
def _detect_addressee_id(
|
||||
prose: str, host_bot: dict, guest_bot: dict | None
|
||||
) -> str:
|
||||
"""Return the bot id of the addressee for ``prose``.
|
||||
|
||||
Phase 2 v1 uses a simple case-insensitive whole-word match. The host
|
||||
is the default — addressee flips to guest only when the guest's name
|
||||
appears in the prose AND the host's does not. If both names match
|
||||
or neither matches, the host keeps the floor. This bias keeps the
|
||||
primary speaker stable across ambiguous prose; the interjection
|
||||
branch (later in the turn flow) is how the silent witness gets a word
|
||||
in edgewise when warranted.
|
||||
"""
|
||||
if guest_bot is None:
|
||||
return host_bot["id"]
|
||||
host_name = host_bot.get("name") or ""
|
||||
guest_name = guest_bot.get("name") or ""
|
||||
host_match = bool(
|
||||
host_name
|
||||
and re.search(rf"\b{re.escape(host_name)}\b", prose, re.IGNORECASE)
|
||||
)
|
||||
guest_match = bool(
|
||||
guest_name
|
||||
and re.search(rf"\b{re.escape(guest_name)}\b", prose, re.IGNORECASE)
|
||||
)
|
||||
if guest_match and not host_match:
|
||||
return guest_bot["id"]
|
||||
return host_bot["id"]
|
||||
|
||||
|
||||
def _gather_state_update_inputs(
|
||||
conn,
|
||||
*,
|
||||
host_bot: dict,
|
||||
guest_bot: dict | None,
|
||||
you_entity: dict,
|
||||
) -> tuple[list[str], dict[str, str], dict[str, str], dict[tuple[str, str], dict]]:
|
||||
"""Collect ``(present_ids, present_names, personas, prior_edges)`` for
|
||||
a multi-entity state-update pass.
|
||||
|
||||
Phase 2 v1 always pairs ``you`` with the host and (when present) the
|
||||
guest. ``prior_edges`` falls back to the schema default 50/50 baseline
|
||||
when no row exists yet — that mirrors the Phase 1 single-pair flow.
|
||||
|
||||
Order matters: the host comes first so the directed-pair iteration
|
||||
in :func:`compute_state_updates_for_present` matches the Phase 1
|
||||
sequence (host->you, then you->host). Existing tests pin the canned-
|
||||
response queue to that order — keeping it stable means we don't
|
||||
have to reshuffle test fixtures across the Phase 2 cutover.
|
||||
"""
|
||||
present_ids: list[str] = [host_bot["id"], "you"]
|
||||
present_names: dict[str, str] = {
|
||||
host_bot["id"]: host_bot["name"],
|
||||
"you": you_entity.get("name") or "you",
|
||||
}
|
||||
personas: dict[str, str] = {
|
||||
host_bot["id"]: host_bot.get("persona") or "",
|
||||
"you": you_entity.get("persona") or "",
|
||||
}
|
||||
if guest_bot is not None:
|
||||
present_ids.append(guest_bot["id"])
|
||||
present_names[guest_bot["id"]] = guest_bot["name"]
|
||||
personas[guest_bot["id"]] = guest_bot.get("persona") or ""
|
||||
|
||||
prior_edges: dict[tuple[str, str], dict] = {}
|
||||
for src in present_ids:
|
||||
for tgt in present_ids:
|
||||
if src == tgt:
|
||||
continue
|
||||
edge = get_edge(conn, src, tgt) or {
|
||||
"affinity": 50,
|
||||
"trust": 50,
|
||||
"summary": "",
|
||||
}
|
||||
prior_edges[(src, tgt)] = edge
|
||||
return present_ids, present_names, personas, prior_edges
|
||||
|
||||
|
||||
@router.post("/chats/{chat_id}/turns")
|
||||
async def post_turn(
|
||||
chat_id: str,
|
||||
@@ -137,6 +232,15 @@ async def post_turn(
|
||||
detail=f"host bot not found: {chat['host_bot_id']}",
|
||||
)
|
||||
|
||||
guest_bot = None
|
||||
guest_bot_id = chat.get("guest_bot_id")
|
||||
if guest_bot_id is not None:
|
||||
guest_bot = get_bot(conn, guest_bot_id)
|
||||
# If the chat references a deleted guest we degrade to single-bot
|
||||
# rather than 404 — the chat is still usable as a 1:1.
|
||||
if guest_bot is None:
|
||||
guest_bot_id = None
|
||||
|
||||
settings = request.app.state.settings
|
||||
|
||||
# 1. Parse turn (classifier).
|
||||
@@ -156,7 +260,16 @@ async def post_turn(
|
||||
},
|
||||
)
|
||||
|
||||
# 3. Append assistant_turn_started placeholder. ``user_turn``,
|
||||
# 3. Determine the addressee. Done before assistant_turn_started so the
|
||||
# placeholder reflects the bot the user is actually talking to (host
|
||||
# in 1:1, host-or-guest in multi-entity).
|
||||
addressee_id = _detect_addressee_id(prose, host_bot, guest_bot)
|
||||
addressee_bot = (
|
||||
guest_bot if (guest_bot is not None and addressee_id == guest_bot["id"])
|
||||
else host_bot
|
||||
)
|
||||
|
||||
# 4. Append assistant_turn_started placeholder. ``user_turn``,
|
||||
# ``assistant_turn_started``, and ``assistant_turn`` have no registered
|
||||
# projector handlers — they live in the event_log purely for transcript
|
||||
# rendering — so we don't call ``project`` here. (Re-projecting now would
|
||||
@@ -166,12 +279,15 @@ async def post_turn(
|
||||
kind="assistant_turn_started",
|
||||
payload={
|
||||
"chat_id": chat_id,
|
||||
"speaker_id": host_bot["id"],
|
||||
"speaker_id": addressee_bot["id"],
|
||||
"user_turn_id": user_turn_event_id,
|
||||
},
|
||||
)
|
||||
|
||||
# 4. Build the narrative prompt.
|
||||
# 5. Build the narrative prompt for the addressee. ``guest_id`` is
|
||||
# passed explicitly so the prompt assembler renders the guest's
|
||||
# activity / group-node block when applicable. The assembler is
|
||||
# tolerant of ``guest_id is None`` so this is a no-op for 1:1 chats.
|
||||
recent = _read_recent_dialogue(conn, chat_id, limit=20)
|
||||
# Drop the just-appended user turn from ``recent`` — it's passed as
|
||||
# ``user_turn_prose`` to the assembler and would otherwise duplicate.
|
||||
@@ -180,189 +296,327 @@ async def post_turn(
|
||||
messages = assemble_narrative_prompt(
|
||||
conn,
|
||||
chat_id=chat_id,
|
||||
speaker_bot_id=host_bot["id"],
|
||||
speaker_bot_id=addressee_bot["id"],
|
||||
user_turn_prose=prompt_prose if prompt_prose else None,
|
||||
recent_dialogue=recent,
|
||||
budget_soft=settings.narrative_budget_soft,
|
||||
budget_hard=settings.narrative_budget_hard,
|
||||
guest_id=guest_bot_id,
|
||||
)
|
||||
|
||||
# 5. Stream and accumulate tokens. The stream runs as a Task so the
|
||||
# 6. Stream and accumulate tokens. The stream runs as a Task so the
|
||||
# /turns/cancel route can invoke ``Task.cancel()`` to abort it
|
||||
# mid-stream. ``accumulated`` is a closure over the inner coroutine,
|
||||
# so when the await on ``stream_task`` raises CancelledError below
|
||||
# we still see whatever tokens were appended before cancellation.
|
||||
accumulated: list[str] = []
|
||||
truncated = False
|
||||
primary_accumulated: list[str] = []
|
||||
primary_truncated = False
|
||||
cancelled = False
|
||||
|
||||
async def _stream() -> None:
|
||||
async def _stream_primary() -> None:
|
||||
async for chunk in client.stream(
|
||||
messages,
|
||||
model=settings.narrative_model,
|
||||
max_tokens=settings.narrative_max_tokens,
|
||||
temperature=settings.narrative_temperature,
|
||||
):
|
||||
accumulated.append(chunk)
|
||||
primary_accumulated.append(chunk)
|
||||
await publish(
|
||||
chat_id,
|
||||
{
|
||||
"event": "token",
|
||||
"text": chunk,
|
||||
"speaker_id": host_bot["id"],
|
||||
"speaker_id": addressee_bot["id"],
|
||||
},
|
||||
)
|
||||
|
||||
stream_task = asyncio.create_task(_stream())
|
||||
stream_task = asyncio.create_task(_stream_primary())
|
||||
_in_flight_tasks[chat_id] = stream_task
|
||||
try:
|
||||
await stream_task
|
||||
except asyncio.CancelledError:
|
||||
# Preserve the partial output before letting the cancellation
|
||||
# propagate so the transcript reflects what the user actually saw.
|
||||
truncated = True
|
||||
primary_truncated = True
|
||||
cancelled = True
|
||||
except Exception:
|
||||
# Surface as a truncated turn rather than losing the partial output.
|
||||
truncated = True
|
||||
primary_truncated = True
|
||||
finally:
|
||||
# Always unregister so a subsequent turn can register a fresh task.
|
||||
_in_flight_tasks.pop(chat_id, None)
|
||||
|
||||
full_text = "".join(accumulated)
|
||||
primary_text = "".join(primary_accumulated)
|
||||
|
||||
# 6. Append the assistant_turn with the final text. (See note above on
|
||||
# 7. Append the assistant_turn with the final text. (See note above on
|
||||
# why we skip ``project`` for these transcript-only event kinds.)
|
||||
append_event(
|
||||
conn,
|
||||
kind="assistant_turn",
|
||||
payload={
|
||||
"chat_id": chat_id,
|
||||
"speaker_id": host_bot["id"],
|
||||
"text": full_text,
|
||||
"truncated": truncated,
|
||||
"speaker_id": addressee_bot["id"],
|
||||
"text": primary_text,
|
||||
"truncated": primary_truncated,
|
||||
"user_turn_id": user_turn_event_id,
|
||||
},
|
||||
)
|
||||
|
||||
# 6a. Per-turn memory write (Plan §11.1, T21). Phase 1 single-bot:
|
||||
# only the host bot has a memory store, witness flags are
|
||||
# ``[you=1, host=1, guest=0]``, and ``pov_summary`` is the raw
|
||||
# narrative text (T27 will rewrite at scene close). Significance
|
||||
# defaults to 1; T22's async classifier pass will overwrite it.
|
||||
# 7a. Per-turn memory write (Plan §11.1, T21 / T41). With a guest
|
||||
# present this fans out to one ``memory_written`` event per witness
|
||||
# (host + guest); without a guest it preserves the Phase 1 single
|
||||
# write keyed on the host. Witness flags are set inside the helper.
|
||||
scene = active_scene(conn, chat_id)
|
||||
_event_id, memory_id = record_turn_memory(
|
||||
memory_results = record_turn_memory_for_present(
|
||||
conn,
|
||||
chat_id=chat_id,
|
||||
host_bot_id=host_bot["id"],
|
||||
narrative_text=full_text,
|
||||
guest_bot_id=guest_bot_id,
|
||||
narrative_text=primary_text,
|
||||
scene_id=scene["id"] if scene else None,
|
||||
chat_clock_at=chat.get("time"),
|
||||
)
|
||||
|
||||
# 6b. Post-turn state-update pass (Requirements §3.4). For Phase 1
|
||||
# the only present entities are ``you`` and ``host_bot`` so we run
|
||||
# two classifier calls — one per directed edge — and append the
|
||||
# resulting ``edge_update`` events. The recent-dialogue slice is
|
||||
# re-read here so the pass sees the just-appended assistant turn.
|
||||
# We use ``append_and_apply`` (vs append + project) because the
|
||||
# edge_update handler is *not* replay-safe: re-projecting prior
|
||||
# events would re-apply their deltas on top of the live row.
|
||||
recent_for_update = _read_recent_dialogue(conn, chat_id, limit=10)
|
||||
# 7b. Post-turn state-update pass (Requirements §3.4 / T40). All
|
||||
# directed pairs over the present entities — 2 pairs for 1:1, 6 for
|
||||
# 3-entity scenes. Run sequentially via the inner helper which honors
|
||||
# the Featherless 2-conn cap.
|
||||
you_entity = get_you(conn) or {"name": "you", "persona": ""}
|
||||
last_at = chat.get("time")
|
||||
recent_for_update = _read_recent_dialogue(conn, chat_id, limit=10)
|
||||
|
||||
edge_b2y = get_edge(conn, host_bot["id"], "you") or {
|
||||
"affinity": 50,
|
||||
"trust": 50,
|
||||
"summary": "",
|
||||
}
|
||||
update_b2y = await compute_state_update(
|
||||
present_ids, present_names, personas, prior_edges = (
|
||||
_gather_state_update_inputs(
|
||||
conn,
|
||||
host_bot=host_bot,
|
||||
guest_bot=guest_bot,
|
||||
you_entity=you_entity,
|
||||
)
|
||||
)
|
||||
|
||||
state_updates = await compute_state_updates_for_present(
|
||||
client,
|
||||
model=settings.classifier_model,
|
||||
source_id=host_bot["id"],
|
||||
target_id="you",
|
||||
source_name=host_bot["name"],
|
||||
source_persona=host_bot.get("persona", ""),
|
||||
target_name=you_entity.get("name", "you"),
|
||||
prior_affinity=edge_b2y["affinity"],
|
||||
prior_trust=edge_b2y["trust"],
|
||||
prior_summary=edge_b2y.get("summary", "") or "",
|
||||
classifier_model=settings.classifier_model,
|
||||
present_ids=present_ids,
|
||||
present_names=present_names,
|
||||
personas=personas,
|
||||
prior_edges=prior_edges,
|
||||
recent_dialogue=recent_for_update,
|
||||
timeout_s=settings.classifier_timeout_s,
|
||||
)
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="edge_update",
|
||||
payload={
|
||||
"source_id": host_bot["id"],
|
||||
"target_id": "you",
|
||||
"chat_id": chat_id,
|
||||
"affinity_delta": update_b2y.affinity_delta,
|
||||
"trust_delta": update_b2y.trust_delta,
|
||||
"knowledge_facts": update_b2y.knowledge_facts,
|
||||
"last_interaction_at": last_at,
|
||||
"last_interaction_chat_id": chat_id,
|
||||
},
|
||||
)
|
||||
for src_id, tgt_id, update in state_updates:
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="edge_update",
|
||||
payload={
|
||||
"source_id": src_id,
|
||||
"target_id": tgt_id,
|
||||
"chat_id": chat_id,
|
||||
"affinity_delta": update.affinity_delta,
|
||||
"trust_delta": update.trust_delta,
|
||||
"knowledge_facts": update.knowledge_facts,
|
||||
"last_interaction_at": last_at,
|
||||
"last_interaction_chat_id": chat_id,
|
||||
},
|
||||
)
|
||||
|
||||
edge_y2b = get_edge(conn, "you", host_bot["id"]) or {
|
||||
"affinity": 50,
|
||||
"trust": 50,
|
||||
"summary": "",
|
||||
}
|
||||
update_y2b = await compute_state_update(
|
||||
client,
|
||||
model=settings.classifier_model,
|
||||
source_id="you",
|
||||
target_id=host_bot["id"],
|
||||
source_name=you_entity.get("name", "you"),
|
||||
source_persona=you_entity.get("persona", "") or "",
|
||||
target_name=host_bot["name"],
|
||||
prior_affinity=edge_y2b["affinity"],
|
||||
prior_trust=edge_y2b["trust"],
|
||||
prior_summary=edge_y2b.get("summary", "") or "",
|
||||
recent_dialogue=recent_for_update,
|
||||
)
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="edge_update",
|
||||
payload={
|
||||
"source_id": "you",
|
||||
"target_id": host_bot["id"],
|
||||
"chat_id": chat_id,
|
||||
"affinity_delta": update_y2b.affinity_delta,
|
||||
"trust_delta": update_y2b.trust_delta,
|
||||
"knowledge_facts": update_y2b.knowledge_facts,
|
||||
"last_interaction_at": last_at,
|
||||
"last_interaction_chat_id": chat_id,
|
||||
},
|
||||
)
|
||||
|
||||
# 6c. Enqueue the async significance pass (Plan §11.1, T22). The
|
||||
# 7c. Enqueue the async significance pass (Plan §11.1, T22). The
|
||||
# worker scores the just-written memory 0-3, updates significance,
|
||||
# and auto-pins on score 3 with the §8.5 soft-cap eviction rule.
|
||||
# Enqueued before the broadcast so it's outstanding by the time the
|
||||
# client sees ``turn_html`` — but the worker is async, so the user
|
||||
# never blocks on it.
|
||||
# Phase 2 picks the host's memory id as the canonical input — guest
|
||||
# POV memories piggyback on the same significance score (the prose
|
||||
# they record is identical for v2; per-POV rewrite happens at scene
|
||||
# close in T45 and downstream-of-significance).
|
||||
worker = getattr(request.app.state, "background_worker", None)
|
||||
if worker is not None and memory_id is not None:
|
||||
host_event_memory = memory_results.get(host_bot["id"])
|
||||
host_memory_id = host_event_memory[1] if host_event_memory else None
|
||||
if worker is not None and host_memory_id is not None:
|
||||
worker.enqueue(
|
||||
SignificanceJob(
|
||||
memory_id=memory_id,
|
||||
narrative_text=full_text,
|
||||
memory_id=host_memory_id,
|
||||
narrative_text=primary_text,
|
||||
prior_dialogue=recent_for_update,
|
||||
host_bot_id=host_bot["id"],
|
||||
)
|
||||
)
|
||||
|
||||
# 6d. Scene-close detection (Plan §7.2, T26). Runs AFTER assistant_turn
|
||||
# so the bot's response is the closing scene's final beat — closing
|
||||
# before narrative would force the bot to speak "in no scene", which
|
||||
# is awkward. Hard signals only in Phase 1: container change parsed
|
||||
# from prose, or explicit "fade out" / "we're done here" patterns.
|
||||
# On classifier failure the service returns ``should_close=False``
|
||||
# so the turn flow keeps moving; the manual close button in the
|
||||
# drawer is the always-available fallback.
|
||||
# 8. Interjection branch (T39 / T44). Only fires when the chat has a
|
||||
# guest AND the addressee was the bot we *can* interject for (i.e.
|
||||
# not the lone bot in a 1:1 chat). The silent witness is whichever
|
||||
# bot didn't get the addressee slot. We only run this when the
|
||||
# primary stream actually completed — a cancelled or errored primary
|
||||
# short-circuits the follow-on so we don't classifier-spam against a
|
||||
# half-formed beat.
|
||||
interjection_text: str | None = None
|
||||
interjection_speaker_id: str | None = None
|
||||
interjection_truncated = False
|
||||
if (
|
||||
guest_bot is not None
|
||||
and not cancelled
|
||||
and not primary_truncated
|
||||
and primary_text.strip()
|
||||
):
|
||||
# Identify the silent witness — the bot that is NOT the addressee.
|
||||
if addressee_id == host_bot["id"]:
|
||||
silent_witness = guest_bot
|
||||
else:
|
||||
silent_witness = host_bot
|
||||
|
||||
edge_w_to_addr = get_edge(
|
||||
conn, silent_witness["id"], addressee_bot["id"]
|
||||
) or {"affinity": 50, "trust": 50, "summary": ""}
|
||||
edge_w_to_you = get_edge(conn, silent_witness["id"], "you") or {
|
||||
"affinity": 50,
|
||||
"trust": 50,
|
||||
"summary": "",
|
||||
}
|
||||
|
||||
decision = await detect_interjection(
|
||||
client,
|
||||
classifier_model=settings.classifier_model,
|
||||
addressee_name=addressee_bot["name"],
|
||||
addressee_just_said=primary_text,
|
||||
silent_witness_name=silent_witness["name"],
|
||||
silent_witness_persona=silent_witness.get("persona") or "",
|
||||
silent_witness_edge_to_addressee=edge_w_to_addr,
|
||||
silent_witness_edge_to_you=edge_w_to_you,
|
||||
you_just_said=prose,
|
||||
timeout_s=settings.classifier_timeout_s,
|
||||
)
|
||||
|
||||
if decision.should_interject:
|
||||
interjection_speaker_id = silent_witness["id"]
|
||||
|
||||
# Re-read recent_dialogue so the just-appended assistant_turn
|
||||
# (the addressee's beat) is in the prompt context.
|
||||
interject_recent = _read_recent_dialogue(conn, chat_id, limit=20)
|
||||
if interject_recent and interject_recent[-1].get("speaker") == "you":
|
||||
interject_recent = interject_recent[:-1]
|
||||
interject_messages = assemble_narrative_prompt(
|
||||
conn,
|
||||
chat_id=chat_id,
|
||||
speaker_bot_id=silent_witness["id"],
|
||||
addressee=addressee_bot["id"],
|
||||
user_turn_prose=prompt_prose if prompt_prose else None,
|
||||
recent_dialogue=interject_recent,
|
||||
budget_soft=settings.narrative_budget_soft,
|
||||
budget_hard=settings.narrative_budget_hard,
|
||||
guest_id=guest_bot_id,
|
||||
)
|
||||
|
||||
interject_accumulated: list[str] = []
|
||||
|
||||
async def _stream_interjection() -> None:
|
||||
async for chunk in client.stream(
|
||||
interject_messages,
|
||||
model=settings.narrative_model,
|
||||
max_tokens=settings.narrative_max_tokens,
|
||||
temperature=settings.narrative_temperature,
|
||||
):
|
||||
interject_accumulated.append(chunk)
|
||||
await publish(
|
||||
chat_id,
|
||||
{
|
||||
"event": "token",
|
||||
"text": chunk,
|
||||
"speaker_id": silent_witness["id"],
|
||||
},
|
||||
)
|
||||
|
||||
interject_task = asyncio.create_task(_stream_interjection())
|
||||
_in_flight_tasks[chat_id] = interject_task
|
||||
try:
|
||||
await interject_task
|
||||
except asyncio.CancelledError:
|
||||
interjection_truncated = True
|
||||
cancelled = True
|
||||
except Exception:
|
||||
interjection_truncated = True
|
||||
finally:
|
||||
_in_flight_tasks.pop(chat_id, None)
|
||||
|
||||
interjection_text = "".join(interject_accumulated)
|
||||
|
||||
append_event(
|
||||
conn,
|
||||
kind="assistant_turn",
|
||||
payload={
|
||||
"chat_id": chat_id,
|
||||
"speaker_id": silent_witness["id"],
|
||||
"text": interjection_text,
|
||||
"truncated": interjection_truncated,
|
||||
"user_turn_id": user_turn_event_id,
|
||||
"interjection_of": addressee_bot["id"],
|
||||
},
|
||||
)
|
||||
|
||||
# Skip the downstream classifier passes if the interjection
|
||||
# was cancelled mid-stream — we don't want to score a partial
|
||||
# beat the user never got to read in full.
|
||||
if not interjection_truncated:
|
||||
# Re-run the multi-pair state update — the interjector
|
||||
# adding their voice plausibly shifts edges for everyone
|
||||
# in the room. Idempotent enough for v2 (deltas accumulate;
|
||||
# no stale state). Re-read recent so the just-appended
|
||||
# interjection turn is in scope.
|
||||
recent_post_interject = _read_recent_dialogue(
|
||||
conn, chat_id, limit=10
|
||||
)
|
||||
# Re-fetch prior edges so deltas land on the post-primary
|
||||
# state rather than the pre-turn baseline.
|
||||
_, _, _, prior_edges_post = _gather_state_update_inputs(
|
||||
conn,
|
||||
host_bot=host_bot,
|
||||
guest_bot=guest_bot,
|
||||
you_entity=you_entity,
|
||||
)
|
||||
state_updates_post = await compute_state_updates_for_present(
|
||||
client,
|
||||
classifier_model=settings.classifier_model,
|
||||
present_ids=present_ids,
|
||||
present_names=present_names,
|
||||
personas=personas,
|
||||
prior_edges=prior_edges_post,
|
||||
recent_dialogue=recent_post_interject,
|
||||
timeout_s=settings.classifier_timeout_s,
|
||||
)
|
||||
for src_id, tgt_id, update in state_updates_post:
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="edge_update",
|
||||
payload={
|
||||
"source_id": src_id,
|
||||
"target_id": tgt_id,
|
||||
"chat_id": chat_id,
|
||||
"affinity_delta": update.affinity_delta,
|
||||
"trust_delta": update.trust_delta,
|
||||
"knowledge_facts": update.knowledge_facts,
|
||||
"last_interaction_at": last_at,
|
||||
"last_interaction_chat_id": chat_id,
|
||||
},
|
||||
)
|
||||
|
||||
# Memory write for the interjection beat — a second pair
|
||||
# of memory_written events (host + guest POVs).
|
||||
record_turn_memory_for_present(
|
||||
conn,
|
||||
chat_id=chat_id,
|
||||
host_bot_id=host_bot["id"],
|
||||
guest_bot_id=guest_bot_id,
|
||||
narrative_text=interjection_text,
|
||||
scene_id=scene["id"] if scene else None,
|
||||
chat_clock_at=chat.get("time"),
|
||||
)
|
||||
|
||||
# 9. Scene-close detection (Plan §7.2, T26). Runs AFTER assistant_turn
|
||||
# and the optional interjection so the bots' responses are part of
|
||||
# the closing scene's final beat — closing before narrative would
|
||||
# force the bot to speak "in no scene", which is awkward. Hard
|
||||
# signals only in Phase 1: container change parsed from prose, or
|
||||
# explicit "fade out" / "we're done here" patterns. On classifier
|
||||
# failure the service returns ``should_close=False`` so the turn
|
||||
# flow keeps moving; the manual close button in the drawer is the
|
||||
# always-available fallback.
|
||||
#
|
||||
# Skip empty prose — no signal to classify and no point spending a
|
||||
# round-trip. Skip when there's no active scene (e.g. after a prior
|
||||
@@ -393,11 +647,12 @@ async def post_turn(
|
||||
"significance": 0,
|
||||
},
|
||||
)
|
||||
# T27: per-POV summary + edge summary update + knowledge
|
||||
# promotion. Runs synchronously after the close so the
|
||||
# next turn (or a subsequent GET /chats/<id>) sees the
|
||||
# rewritten memories and edge summary. Tolerates classifier
|
||||
# failure (returns the empty default and skips the writes).
|
||||
# T27 / T45: per-POV summary + edge summary update + knowledge
|
||||
# promotion for each present witness (host always; guest when
|
||||
# present). Runs synchronously after the close so the next
|
||||
# turn (or a subsequent GET /chats/<id>) sees the rewritten
|
||||
# memories and edge summaries. Tolerates classifier failure
|
||||
# (returns the empty default and skips the writes).
|
||||
await apply_scene_close_summary(
|
||||
conn,
|
||||
client,
|
||||
@@ -408,24 +663,50 @@ async def post_turn(
|
||||
timeout_s=settings.classifier_timeout_s,
|
||||
)
|
||||
|
||||
# 7. Broadcast a JSON completion event (for JS consumers) and an HTML
|
||||
# fragment event (for HTMX SSE swap-into-timeline).
|
||||
# 10. Broadcast a JSON completion event (for JS consumers) and an HTML
|
||||
# fragment event (for HTMX SSE swap-into-timeline). One pair per
|
||||
# written assistant_turn so the timeline ends up with both the
|
||||
# primary and the interjection beat in the right order.
|
||||
await publish(
|
||||
chat_id,
|
||||
{
|
||||
"event": "assistant_turn_complete",
|
||||
"speaker_id": host_bot["id"],
|
||||
"text": full_text,
|
||||
"truncated": truncated,
|
||||
"speaker_id": addressee_bot["id"],
|
||||
"text": primary_text,
|
||||
"truncated": primary_truncated,
|
||||
},
|
||||
)
|
||||
assistant_html = _render_turn_html(
|
||||
host_bot["name"], full_text, role="bot"
|
||||
primary_html = _render_turn_html(
|
||||
addressee_bot["name"], primary_text, role="bot"
|
||||
)
|
||||
await publish(
|
||||
chat_id, {"event": "turn_html", "data": assistant_html}
|
||||
chat_id, {"event": "turn_html", "data": primary_html}
|
||||
)
|
||||
|
||||
if interjection_text is not None and interjection_speaker_id is not None:
|
||||
# The interjector's display name is whichever bot wasn't the
|
||||
# addressee — pull it from the in-scope variable directly.
|
||||
interject_speaker_name = (
|
||||
host_bot["name"]
|
||||
if interjection_speaker_id == host_bot["id"]
|
||||
else (guest_bot["name"] if guest_bot is not None else "bot")
|
||||
)
|
||||
await publish(
|
||||
chat_id,
|
||||
{
|
||||
"event": "assistant_turn_complete",
|
||||
"speaker_id": interjection_speaker_id,
|
||||
"text": interjection_text,
|
||||
"truncated": interjection_truncated,
|
||||
},
|
||||
)
|
||||
interject_html = _render_turn_html(
|
||||
interject_speaker_name, interjection_text, role="bot"
|
||||
)
|
||||
await publish(
|
||||
chat_id, {"event": "turn_html", "data": interject_html}
|
||||
)
|
||||
|
||||
if cancelled:
|
||||
# Re-raise after the partial-turn has been recorded.
|
||||
raise asyncio.CancelledError
|
||||
|
||||
@@ -202,3 +202,487 @@ def test_get_chat_renders_existing_turns(client, tmp_path):
|
||||
body = response.text
|
||||
assert "hello" in body
|
||||
assert "Hi there." in body
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 2 (T44) — multi-entity turn flow.
|
||||
#
|
||||
# These tests cover the post_turn flow when a guest is present: addressee
|
||||
# detection, multi-pair state-update + multi-witness memory writes, and
|
||||
# the optional interjection follow-on. Each test installs its own
|
||||
# MockLLMClient with a canned-response queue tailored to the call shape
|
||||
# of that scenario; the queue is documented at the top of each test so
|
||||
# the orchestration is auditable.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _bot_payload(bot_id: str, name: str, persona: str = "") -> dict:
|
||||
return {
|
||||
"id": bot_id,
|
||||
"name": name,
|
||||
"persona": persona or f"persona for {name}",
|
||||
"voice_samples": [],
|
||||
"traits": [],
|
||||
"backstory": "",
|
||||
"initial_relationship_to_you": "",
|
||||
"kickoff_prose": "...",
|
||||
}
|
||||
|
||||
|
||||
def _seed_chat_with_guest(db_path: Path) -> None:
|
||||
"""Author host BotA + guest BotB, create a chat with both wired in,
|
||||
and seed an open scene plus minimal activity rows so the prompt
|
||||
assembler sees a third party. Edges are seeded for all six directed
|
||||
pairs at the schema-default 50/50 baseline so multi-pair state
|
||||
updates land cleanly."""
|
||||
with open_db(db_path) as conn:
|
||||
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
|
||||
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_b", "BotB"))
|
||||
append_event(
|
||||
conn,
|
||||
kind="you_authored",
|
||||
payload={"name": "Me", "pronouns": "they/them", "persona": ""},
|
||||
)
|
||||
append_event(
|
||||
conn,
|
||||
kind="chat_created",
|
||||
payload={
|
||||
"id": "chat_bot_a",
|
||||
"host_bot_id": "bot_a",
|
||||
"guest_bot_id": "bot_b",
|
||||
"initial_time": "2026-04-26T20:00:00+00:00",
|
||||
"narrative_anchor": "Day 1",
|
||||
"weather": "",
|
||||
},
|
||||
)
|
||||
# Container + open scene so scene_close detection has something
|
||||
# to act on in the per-POV summary test.
|
||||
append_event(
|
||||
conn,
|
||||
kind="container_created",
|
||||
payload={
|
||||
"chat_id": "chat_bot_a",
|
||||
"name": "office",
|
||||
"type": "workplace",
|
||||
"properties": {},
|
||||
},
|
||||
)
|
||||
append_event(
|
||||
conn,
|
||||
kind="scene_opened",
|
||||
payload={
|
||||
"chat_id": "chat_bot_a",
|
||||
"container_id": 1,
|
||||
"started_at": "2026-04-26T20:00:00+00:00",
|
||||
"participants": ["you", "bot_a", "bot_b"],
|
||||
},
|
||||
)
|
||||
# Seed all six directed edges so state-update writes land on
|
||||
# initialized rows. Knowledge fact on bot_a -> you exercises
|
||||
# the existing-fact preservation path.
|
||||
for src, tgt, facts in [
|
||||
("bot_a", "you", ["coworker"]),
|
||||
("you", "bot_a", []),
|
||||
("bot_b", "you", []),
|
||||
("you", "bot_b", []),
|
||||
("bot_a", "bot_b", []),
|
||||
("bot_b", "bot_a", []),
|
||||
]:
|
||||
append_event(
|
||||
conn,
|
||||
kind="edge_update",
|
||||
payload={
|
||||
"source_id": src,
|
||||
"target_id": tgt,
|
||||
"chat_id": "chat_bot_a",
|
||||
"knowledge_facts": facts,
|
||||
},
|
||||
)
|
||||
for entity_id, verb in [
|
||||
("you", "talking"),
|
||||
("bot_a", "listening"),
|
||||
("bot_b", "listening"),
|
||||
]:
|
||||
append_event(
|
||||
conn,
|
||||
kind="activity_change",
|
||||
payload={
|
||||
"entity_id": entity_id,
|
||||
"posture": "sitting",
|
||||
"action": {
|
||||
"verb": verb,
|
||||
"interruptible": True,
|
||||
"required_attention": "low",
|
||||
"expected_duration": "ongoing",
|
||||
},
|
||||
"attention": "",
|
||||
"holding": [],
|
||||
"status": {},
|
||||
},
|
||||
)
|
||||
project(conn)
|
||||
|
||||
|
||||
def _override_llm(canned: list[str]) -> MockLLMClient:
|
||||
"""Wire a fresh ``MockLLMClient`` and return it so tests can introspect
|
||||
the residual canned queue after the request."""
|
||||
from chat.web.kickoff import get_llm_client
|
||||
|
||||
mock = MockLLMClient(canned=list(canned))
|
||||
app.dependency_overrides[get_llm_client] = lambda: mock
|
||||
return mock
|
||||
|
||||
|
||||
def _zero_state() -> str:
|
||||
return json.dumps(
|
||||
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app_state_setup(tmp_path, monkeypatch):
|
||||
"""Same env wiring as the existing ``client`` fixture but without a
|
||||
pre-installed MockLLMClient — the multi-entity tests pin their own
|
||||
canned queues per scenario.
|
||||
"""
|
||||
cfg = tmp_path / "config.toml"
|
||||
cfg.write_text('featherless_api_key = "test"\n')
|
||||
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
|
||||
db = tmp_path / "test.db"
|
||||
monkeypatch.setenv("CHAT_DB_PATH", str(db))
|
||||
with TestClient(app) as c:
|
||||
app.state.background_worker.enabled = False
|
||||
yield c
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
def test_single_bot_turn_no_guest_regression(app_state_setup, tmp_path):
|
||||
"""No-guest regression: the canned-response queue remains parse +
|
||||
narrative + 2 state-updates. Interjection is path-bypassed because
|
||||
the chat has no guest, so ``detect_interjection`` is NOT invoked.
|
||||
Ends with one user_turn, one assistant_turn, two edge_updates, and a
|
||||
single ``memory_written``.
|
||||
"""
|
||||
_seed(tmp_path / "test.db")
|
||||
canned_parse = json.dumps(
|
||||
{"segments": [{"kind": "dialogue", "text": "hello"}]}
|
||||
)
|
||||
mock = _override_llm(
|
||||
[canned_parse, "Hi there.", _zero_state(), _zero_state()]
|
||||
)
|
||||
try:
|
||||
response = app_state_setup.post(
|
||||
"/chats/chat_bot_a/turns", data={"prose": "hello"}
|
||||
)
|
||||
assert response.status_code == 204
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
# No guest -> no interjection classifier call -> queue fully drained.
|
||||
assert mock._canned == []
|
||||
|
||||
with open_db(tmp_path / "test.db") as conn:
|
||||
cur = conn.execute(
|
||||
"SELECT kind FROM event_log "
|
||||
"WHERE kind IN ('user_turn', 'assistant_turn', 'edge_update', "
|
||||
" 'memory_written') ORDER BY id"
|
||||
)
|
||||
kinds = [r[0] for r in cur.fetchall()]
|
||||
user_turns = [k for k in kinds if k == "user_turn"]
|
||||
assistant_turns = [k for k in kinds if k == "assistant_turn"]
|
||||
edge_updates_after_seed = [k for k in kinds if k == "edge_update"]
|
||||
memory_writes = [k for k in kinds if k == "memory_written"]
|
||||
assert len(user_turns) == 1
|
||||
assert len(assistant_turns) == 1
|
||||
# Seed adds exactly one edge_update (bot_a -> you); the post-turn
|
||||
# pass adds two more for a total of three.
|
||||
assert len(edge_updates_after_seed) == 3
|
||||
assert len(memory_writes) == 1
|
||||
|
||||
|
||||
def test_multi_bot_turn_no_interjection(app_state_setup, tmp_path):
|
||||
"""Chat has a guest; ``detect_interjection`` returns False. Verify:
|
||||
1 user_turn + 1 assistant_turn + 6 *post-turn* edge_updates + 2
|
||||
memory_written events. Single turn_html broadcast.
|
||||
|
||||
Canned queue (8 calls):
|
||||
1. parse_turn
|
||||
2. narrative stream (primary, addressee = host because the prose
|
||||
doesn't name the guest)
|
||||
3-8. 6 state-update calls (one per directed pair across {you,
|
||||
bot_a, bot_b})
|
||||
9. detect_interjection -> should_interject=False
|
||||
10. detect_scene_close -> should_close=False
|
||||
"""
|
||||
_seed_chat_with_guest(tmp_path / "test.db")
|
||||
canned_parse = json.dumps(
|
||||
{"segments": [{"kind": "dialogue", "text": "hello room"}]}
|
||||
)
|
||||
canned = [
|
||||
canned_parse,
|
||||
"Greetings.",
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
json.dumps({"should_interject": False, "reason": "calm"}),
|
||||
json.dumps({"should_close": False, "reason": "no signal"}),
|
||||
]
|
||||
mock = _override_llm(canned)
|
||||
try:
|
||||
response = app_state_setup.post(
|
||||
"/chats/chat_bot_a/turns", data={"prose": "hello room"}
|
||||
)
|
||||
assert response.status_code == 204
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
# All 10 canned slots should have been consumed.
|
||||
assert mock._canned == []
|
||||
|
||||
with open_db(tmp_path / "test.db") as conn:
|
||||
# Count post-turn edge_updates (i.e. those after the latest
|
||||
# assistant_turn id).
|
||||
max_at = conn.execute(
|
||||
"SELECT MAX(id) FROM event_log WHERE kind = 'assistant_turn'"
|
||||
).fetchone()[0]
|
||||
cur = conn.execute(
|
||||
"SELECT COUNT(*) FROM event_log "
|
||||
"WHERE kind = 'edge_update' AND id > ?",
|
||||
(max_at,),
|
||||
)
|
||||
post_turn_edge_updates = cur.fetchone()[0]
|
||||
|
||||
cur = conn.execute(
|
||||
"SELECT COUNT(*) FROM event_log WHERE kind = 'user_turn'"
|
||||
)
|
||||
user_turn_count = cur.fetchone()[0]
|
||||
cur = conn.execute(
|
||||
"SELECT COUNT(*) FROM event_log WHERE kind = 'assistant_turn'"
|
||||
)
|
||||
assistant_turn_count = cur.fetchone()[0]
|
||||
cur = conn.execute(
|
||||
"SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'"
|
||||
)
|
||||
memory_count = cur.fetchone()[0]
|
||||
|
||||
assert user_turn_count == 1
|
||||
assert assistant_turn_count == 1
|
||||
assert post_turn_edge_updates == 6
|
||||
assert memory_count == 2
|
||||
|
||||
|
||||
def test_multi_bot_turn_with_interjection(app_state_setup, tmp_path):
|
||||
"""Chat has a guest; ``detect_interjection`` returns True. Verify:
|
||||
1 user_turn + 2 assistant_turns + (6 + 6) post-turn edge_updates +
|
||||
4 memory_written events.
|
||||
|
||||
Canned queue (16 calls):
|
||||
1. parse_turn
|
||||
2. narrative stream (primary)
|
||||
3-8. 6 state-update calls (post-primary)
|
||||
9. detect_interjection -> should_interject=True
|
||||
10. narrative stream (interjection)
|
||||
11-16. 6 state-update calls (post-interjection)
|
||||
17. detect_scene_close -> should_close=False
|
||||
"""
|
||||
_seed_chat_with_guest(tmp_path / "test.db")
|
||||
canned_parse = json.dumps(
|
||||
{"segments": [{"kind": "dialogue", "text": "tell me"}]}
|
||||
)
|
||||
canned = [
|
||||
canned_parse,
|
||||
"Primary beat.",
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
json.dumps({"should_interject": True, "reason": "jealous"}),
|
||||
"Interjection beat!",
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
json.dumps({"should_close": False, "reason": "no signal"}),
|
||||
]
|
||||
mock = _override_llm(canned)
|
||||
try:
|
||||
response = app_state_setup.post(
|
||||
"/chats/chat_bot_a/turns", data={"prose": "tell me"}
|
||||
)
|
||||
assert response.status_code == 204
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
assert mock._canned == []
|
||||
|
||||
with open_db(tmp_path / "test.db") as conn:
|
||||
cur = conn.execute(
|
||||
"SELECT COUNT(*) FROM event_log WHERE kind = 'assistant_turn'"
|
||||
)
|
||||
assistant_count = cur.fetchone()[0]
|
||||
cur = conn.execute(
|
||||
"SELECT COUNT(*) FROM event_log WHERE kind = 'memory_written'"
|
||||
)
|
||||
memory_count = cur.fetchone()[0]
|
||||
# All edge_updates after the FIRST assistant_turn are post-turn.
|
||||
first_at = conn.execute(
|
||||
"SELECT MIN(id) FROM event_log WHERE kind = 'assistant_turn'"
|
||||
).fetchone()[0]
|
||||
post_turn_edges = conn.execute(
|
||||
"SELECT COUNT(*) FROM event_log "
|
||||
"WHERE kind = 'edge_update' AND id > ?",
|
||||
(first_at,),
|
||||
).fetchone()[0]
|
||||
|
||||
# Both assistant_turn payloads should reference the same user_turn
|
||||
# and the second one tags ``interjection_of`` the first speaker.
|
||||
rows = conn.execute(
|
||||
"SELECT payload_json FROM event_log "
|
||||
"WHERE kind = 'assistant_turn' ORDER BY id"
|
||||
).fetchall()
|
||||
first_payload = json.loads(rows[0][0])
|
||||
second_payload = json.loads(rows[1][0])
|
||||
|
||||
assert assistant_count == 2
|
||||
assert memory_count == 4
|
||||
assert post_turn_edges == 12
|
||||
assert first_payload["text"] == "Primary beat."
|
||||
assert second_payload["text"] == "Interjection beat!"
|
||||
# The silent witness is the bot that wasn't the primary addressee.
|
||||
assert second_payload["interjection_of"] == first_payload["speaker_id"]
|
||||
assert second_payload["speaker_id"] != first_payload["speaker_id"]
|
||||
assert first_payload["user_turn_id"] == second_payload["user_turn_id"]
|
||||
|
||||
|
||||
def test_multi_bot_turn_scene_close_writes_per_pov_summaries(
|
||||
app_state_setup, tmp_path
|
||||
):
|
||||
"""Chat has a guest, prose hard-signals a scene close, classifier
|
||||
confirms. Verify a ``scene_closed`` event lands and per-POV summary
|
||||
rewrites fire for both bots (memory.pov_summary changes for each).
|
||||
Interjection short-circuits at False so the queue stays compact.
|
||||
|
||||
Canned queue (12 calls):
|
||||
1. parse_turn
|
||||
2. narrative stream (primary)
|
||||
3-8. 6 state-update calls
|
||||
9. detect_interjection -> False (no follow-on stream)
|
||||
10. detect_scene_close -> True
|
||||
11. apply_scene_close_summary host POV
|
||||
12. apply_scene_close_summary guest POV
|
||||
"""
|
||||
_seed_chat_with_guest(tmp_path / "test.db")
|
||||
canned_parse = json.dumps(
|
||||
{
|
||||
"segments": [
|
||||
{"kind": "narration", "text": "we are done here, fade out"}
|
||||
]
|
||||
}
|
||||
)
|
||||
pov_payload = json.dumps(
|
||||
{
|
||||
"summary": "BotA noticed the day winding down.",
|
||||
"knowledge_facts": [],
|
||||
"relationship_summary": "warmer",
|
||||
}
|
||||
)
|
||||
pov_payload_guest = json.dumps(
|
||||
{
|
||||
"summary": "BotB watched the scene close.",
|
||||
"knowledge_facts": [],
|
||||
"relationship_summary": "warmer",
|
||||
}
|
||||
)
|
||||
canned = [
|
||||
canned_parse,
|
||||
"Goodnight.",
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
json.dumps({"should_interject": False, "reason": "calm"}),
|
||||
json.dumps({"should_close": True, "reason": "fade out signaled"}),
|
||||
pov_payload,
|
||||
pov_payload_guest,
|
||||
]
|
||||
mock = _override_llm(canned)
|
||||
try:
|
||||
response = app_state_setup.post(
|
||||
"/chats/chat_bot_a/turns", data={"prose": "we are done here, fade out"}
|
||||
)
|
||||
assert response.status_code == 204
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
assert mock._canned == []
|
||||
|
||||
with open_db(tmp_path / "test.db") as conn:
|
||||
cur = conn.execute(
|
||||
"SELECT COUNT(*) FROM event_log WHERE kind = 'scene_closed'"
|
||||
)
|
||||
scene_close_count = cur.fetchone()[0]
|
||||
# One memory_pov_summary manual_edit per witness.
|
||||
cur = conn.execute(
|
||||
"SELECT payload_json FROM event_log WHERE kind = 'manual_edit'"
|
||||
)
|
||||
manual_edits = [json.loads(r[0]) for r in cur.fetchall()]
|
||||
pov_edits = [
|
||||
e for e in manual_edits
|
||||
if e.get("target_kind") == "memory_pov_summary"
|
||||
]
|
||||
# After the rewrite, bot_a's scene-1 memory carries the host POV
|
||||
# and bot_b's scene-1 memory carries the guest POV.
|
||||
host_pov = conn.execute(
|
||||
"SELECT pov_summary FROM memories WHERE owner_id = ? AND scene_id = 1",
|
||||
("bot_a",),
|
||||
).fetchone()
|
||||
guest_pov = conn.execute(
|
||||
"SELECT pov_summary FROM memories WHERE owner_id = ? AND scene_id = 1",
|
||||
("bot_b",),
|
||||
).fetchone()
|
||||
|
||||
assert scene_close_count == 1
|
||||
# Two memory rewrites — one per witness.
|
||||
assert len(pov_edits) == 2
|
||||
assert host_pov is not None and "BotA noticed" in host_pov[0]
|
||||
assert guest_pov is not None and "BotB watched" in guest_pov[0]
|
||||
|
||||
|
||||
def test_addressee_detection_routes_to_named_bot(app_state_setup, tmp_path):
|
||||
"""Prose that names the guest by name routes the primary turn to the
|
||||
guest. Interjection (when fired) makes the host the silent witness
|
||||
and the second assistant_turn carries the host as speaker.
|
||||
|
||||
Canned queue: same shape as the with-interjection test (16 calls)
|
||||
plus the trailing scene_close decision.
|
||||
"""
|
||||
_seed_chat_with_guest(tmp_path / "test.db")
|
||||
canned_parse = json.dumps(
|
||||
{"segments": [{"kind": "dialogue", "text": "BotB, what do you think?"}]}
|
||||
)
|
||||
canned = [
|
||||
canned_parse,
|
||||
"BotB pondering.",
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
json.dumps({"should_interject": True, "reason": "host wants in"}),
|
||||
"BotA chiming in.",
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
_zero_state(), _zero_state(), _zero_state(),
|
||||
json.dumps({"should_close": False, "reason": "no signal"}),
|
||||
]
|
||||
mock = _override_llm(canned)
|
||||
try:
|
||||
response = app_state_setup.post(
|
||||
"/chats/chat_bot_a/turns",
|
||||
data={"prose": "BotB, what do you think?"},
|
||||
)
|
||||
assert response.status_code == 204
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
assert mock._canned == []
|
||||
|
||||
with open_db(tmp_path / "test.db") as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT payload_json FROM event_log "
|
||||
"WHERE kind = 'assistant_turn' ORDER BY id"
|
||||
).fetchall()
|
||||
primary_payload = json.loads(rows[0][0])
|
||||
interjection_payload = json.loads(rows[1][0])
|
||||
|
||||
# Primary speaker is the guest because the prose names BotB and not
|
||||
# BotA (case-insensitive whole-word match).
|
||||
assert primary_payload["speaker_id"] == "bot_b"
|
||||
# Interjection follow-on goes to the silent witness — the host.
|
||||
assert interjection_payload["speaker_id"] == "bot_a"
|
||||
assert interjection_payload["interjection_of"] == "bot_b"
|
||||
|
||||
Reference in New Issue
Block a user