Files
chat/chat/web/turns.py
T
Joseph Doherty c86b0df411 feat: T44 multi-entity turn flow with interjection support
Rewrites post_turn for the multi-entity world:

- Addressee detection via case-insensitive whole-word match against the
  guest name; defaults to host on no-match or both-match.
- Multi-entity prompt assembly: forwards guest_id so the prompt sees
  the third party's activity / edges / group-node.
- Multi-witness memory write: record_turn_memory_for_present writes one
  memory per present bot witness when a guest is in the room.
- Multi-pair state-update: compute_state_updates_for_present emits one
  edge_update per directed pair (6 with a guest, 2 without).
- Interjection branch (T39): when a guest is present and the primary
  beat completes, the silent witness may follow on. detect_interjection
  decides; on True we stream a second narrative as the witness, append a
  second assistant_turn linked to the same user_turn_id, and re-run the
  multi-pair state update + memory write for the follow-on beat. Cancel
  collapses both halves; a cancelled interjection skips its downstream
  passes so we don't classifier-spam against a half-formed beat.
- Scene-close runs after both beats so apply_scene_close_summary sees
  the full closing scene; T45's guest-aware summarizer handles per-POV
  rewrites for each present witness.

regenerate.py mirrors the prompt / memory / state-update changes for
1:1 and multi-entity scenes. Per the Phase 2 spec, interjection
regeneration is deferred to Phase 2.5 — regenerate only re-streams the
addressee turn for v2.

Tests: adds 5 cases to tests/test_turn_flow.py covering the no-guest
regression, multi-bot without interjection, multi-bot with interjection,
scene-close per-POV rewrites, and addressee routing on a named-bot
prose. Each test pins its own canned MockLLMClient queue with the call
shape documented in the docstring.
2026-04-26 16:18:38 -04:00

864 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""POST ``/chats/<id>/turns`` — narrative turn flow with SSE streaming.
The turn flow strings together the pieces built in T17 (turn parser), T18
(prompt assembler), and T16 (SSE channel). Phase 2 (T44) extends it to
multi-entity scenes with optional guest support and a follow-on
interjection beat.
1. Parse the user's prose with the classifier into typed segments.
2. Append a ``user_turn`` event capturing both the original prose and the
parsed segments.
3. Append a placeholder ``assistant_turn_started`` marker so observers know
a response is in flight.
4. Detect the addressee (host vs. guest) from the prose using a simple
word-boundary substring match — see :func:`_detect_addressee_id`.
5. Build the narrative prompt for the addressee, dropping OOC segments
before they reach the bot (per Requirements §6.1 the OOC convention is
for the author to talk to the system, not to the in-fiction bot).
6. Stream tokens from the LLM, broadcasting each chunk over the chat's SSE
channel as a ``token`` event so any subscribed browser tab sees them
arrive in real time.
7. On stream complete, append an ``assistant_turn`` event with the full
text and ``truncated=False``. Then run a post-turn state-update pass
(Requirements §3.4): one classifier call per directed edge between
present entities, each producing an ``edge_update`` event with
affinity/trust/knowledge deltas.
8. When a guest is present, run the interjection classifier (§6.2). If it
fires we stream a second narrative as the silent witness, append a
second ``assistant_turn`` event linked to the same ``user_turn_id``,
and re-run memory + state-update for the interjector. The same
in-flight task covers both halves so cancel collapses both.
9. Scene-close detection runs after the (primary + optional interjection)
beats land so the close summary sees the full closing scene. T45's
guest-aware ``apply_scene_close_summary`` writes per-POV summaries for
each present witness.
10. Publish a ``turn_html`` event for each turn so HTMX's SSE extension
can append it to the timeline without a page reload.
11. Return ``204 No Content`` — the SSE channel is the real conveyor of
state, not the POST response body.
Errors during streaming flip the assistant_turn's ``truncated`` flag to
``True`` and we still commit what we received. ``asyncio.CancelledError``
is treated identically and re-raised after recording the partial turn.
A cancellation mid-interjection skips the interjector's state/memory
follow-up so we don't run classifiers against a half-formed beat.
"""
from __future__ import annotations
import asyncio
import html
import json
import re
from fastapi import APIRouter, Depends, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from chat.eventlog.log import append_and_apply, append_event
from chat.services.background import SignificanceJob
from chat.services.interjection import detect_interjection
from chat.services.memory_write import record_turn_memory_for_present
from chat.services.multi_state_update import compute_state_updates_for_present
from chat.services.prompt import assemble_narrative_prompt
from chat.services.rewind import compute_rewind_preview, execute_rewind
from chat.services.scene_close import detect_scene_close
from chat.services.scene_summarize import apply_scene_close_summary
from chat.services.turn_parse import ParsedTurn, parse_turn
from chat.state.edges import get_edge
from chat.state.entities import get_bot, get_you
from chat.state.world import active_scene, get_chat, get_container
from chat.web.bots import get_conn
from chat.web.kickoff import get_llm_client
from chat.web.pubsub import publish
from chat.web.render import render_turn_html as _render_turn_html
router = APIRouter()
# Module-level registry of in-flight streaming tasks, keyed by chat_id.
# The POST /chats/<id>/turns/cancel route looks up the task and calls
# .cancel(); the streaming coroutine in post_turn catches the resulting
# CancelledError, commits the partial as truncated, and unregisters.
# Single-process v1 only — sufficient for one user with multiple tabs.
_in_flight_tasks: dict[str, asyncio.Task] = {}
def _strip_ooc_for_prompt(parsed: ParsedTurn) -> str:
"""Concatenate non-OOC segments back to a prose string for the prompt.
OOC segments (``((double parens))``) are kept in the user_turn payload
for transcript display but stripped before assembly so the bot never
sees author-to-system messages.
"""
keep = [s.text for s in parsed.segments if s.kind != "ooc"]
return " ".join(keep).strip()
def _read_recent_dialogue(conn, chat_id: str, limit: int = 200) -> list[dict]:
"""Return user-side and assistant_turn events for ``chat_id``.
Includes ``user_turn``, ``user_turn_edit`` (T29 edited prose), and
``assistant_turn``. Ordered oldest-first; superseded/hidden rows are
skipped so regenerated turns (T29) drop out of the rendered timeline.
Each entry is shaped ``{"speaker": <id-or-"you">, "text": <prose>}``
for the prompt assembler and the chat-detail template.
"""
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(limit,),
)
rows = cur.fetchall()
rows.reverse() # back to chronological order
out: list[dict] = []
for _row_id, kind, payload_json in rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
continue
if kind in ("user_turn", "user_turn_edit"):
# Edited prose substitutes for the original user_turn (the
# original is marked superseded_by and filtered above).
out.append({"speaker": "you", "text": p.get("prose", "")})
else:
out.append(
{
"speaker": p.get("speaker_id", "bot"),
"text": p.get("text", ""),
}
)
return out
def _detect_addressee_id(
prose: str, host_bot: dict, guest_bot: dict | None
) -> str:
"""Return the bot id of the addressee for ``prose``.
Phase 2 v1 uses a simple case-insensitive whole-word match. The host
is the default — addressee flips to guest only when the guest's name
appears in the prose AND the host's does not. If both names match
or neither matches, the host keeps the floor. This bias keeps the
primary speaker stable across ambiguous prose; the interjection
branch (later in the turn flow) is how the silent witness gets a word
in edgewise when warranted.
"""
if guest_bot is None:
return host_bot["id"]
host_name = host_bot.get("name") or ""
guest_name = guest_bot.get("name") or ""
host_match = bool(
host_name
and re.search(rf"\b{re.escape(host_name)}\b", prose, re.IGNORECASE)
)
guest_match = bool(
guest_name
and re.search(rf"\b{re.escape(guest_name)}\b", prose, re.IGNORECASE)
)
if guest_match and not host_match:
return guest_bot["id"]
return host_bot["id"]
def _gather_state_update_inputs(
conn,
*,
host_bot: dict,
guest_bot: dict | None,
you_entity: dict,
) -> tuple[list[str], dict[str, str], dict[str, str], dict[tuple[str, str], dict]]:
"""Collect ``(present_ids, present_names, personas, prior_edges)`` for
a multi-entity state-update pass.
Phase 2 v1 always pairs ``you`` with the host and (when present) the
guest. ``prior_edges`` falls back to the schema default 50/50 baseline
when no row exists yet — that mirrors the Phase 1 single-pair flow.
Order matters: the host comes first so the directed-pair iteration
in :func:`compute_state_updates_for_present` matches the Phase 1
sequence (host->you, then you->host). Existing tests pin the canned-
response queue to that order — keeping it stable means we don't
have to reshuffle test fixtures across the Phase 2 cutover.
"""
present_ids: list[str] = [host_bot["id"], "you"]
present_names: dict[str, str] = {
host_bot["id"]: host_bot["name"],
"you": you_entity.get("name") or "you",
}
personas: dict[str, str] = {
host_bot["id"]: host_bot.get("persona") or "",
"you": you_entity.get("persona") or "",
}
if guest_bot is not None:
present_ids.append(guest_bot["id"])
present_names[guest_bot["id"]] = guest_bot["name"]
personas[guest_bot["id"]] = guest_bot.get("persona") or ""
prior_edges: dict[tuple[str, str], dict] = {}
for src in present_ids:
for tgt in present_ids:
if src == tgt:
continue
edge = get_edge(conn, src, tgt) or {
"affinity": 50,
"trust": 50,
"summary": "",
}
prior_edges[(src, tgt)] = edge
return present_ids, present_names, personas, prior_edges
@router.post("/chats/{chat_id}/turns")
async def post_turn(
chat_id: str,
request: Request,
prose: str = Form(""),
conn=Depends(get_conn),
client=Depends(get_llm_client),
):
if not prose.strip():
raise HTTPException(status_code=400, detail="prose cannot be empty")
chat = get_chat(conn, chat_id)
if chat is None:
raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}")
host_bot = get_bot(conn, chat["host_bot_id"])
if host_bot is None:
# Defensive: chat row references a missing bot.
raise HTTPException(
status_code=404,
detail=f"host bot not found: {chat['host_bot_id']}",
)
guest_bot = None
guest_bot_id = chat.get("guest_bot_id")
if guest_bot_id is not None:
guest_bot = get_bot(conn, guest_bot_id)
# If the chat references a deleted guest we degrade to single-bot
# rather than 404 — the chat is still usable as a 1:1.
if guest_bot is None:
guest_bot_id = None
settings = request.app.state.settings
# 1. Parse turn (classifier).
parsed = await parse_turn(
client, model=settings.classifier_model, prose=prose
)
prompt_prose = _strip_ooc_for_prompt(parsed)
# 2. Append user_turn event.
user_turn_event_id = append_event(
conn,
kind="user_turn",
payload={
"chat_id": chat_id,
"prose": prose,
"segments": [s.model_dump() for s in parsed.segments],
},
)
# 3. Determine the addressee. Done before assistant_turn_started so the
# placeholder reflects the bot the user is actually talking to (host
# in 1:1, host-or-guest in multi-entity).
addressee_id = _detect_addressee_id(prose, host_bot, guest_bot)
addressee_bot = (
guest_bot if (guest_bot is not None and addressee_id == guest_bot["id"])
else host_bot
)
# 4. Append assistant_turn_started placeholder. ``user_turn``,
# ``assistant_turn_started``, and ``assistant_turn`` have no registered
# projector handlers — they live in the event_log purely for transcript
# rendering — so we don't call ``project`` here. (Re-projecting now would
# also re-run prior non-idempotent inserts like ``chat_created``.)
append_event(
conn,
kind="assistant_turn_started",
payload={
"chat_id": chat_id,
"speaker_id": addressee_bot["id"],
"user_turn_id": user_turn_event_id,
},
)
# 5. Build the narrative prompt for the addressee. ``guest_id`` is
# passed explicitly so the prompt assembler renders the guest's
# activity / group-node block when applicable. The assembler is
# tolerant of ``guest_id is None`` so this is a no-op for 1:1 chats.
recent = _read_recent_dialogue(conn, chat_id, limit=20)
# Drop the just-appended user turn from ``recent`` — it's passed as
# ``user_turn_prose`` to the assembler and would otherwise duplicate.
if recent and recent[-1].get("speaker") == "you":
recent = recent[:-1]
messages = assemble_narrative_prompt(
conn,
chat_id=chat_id,
speaker_bot_id=addressee_bot["id"],
user_turn_prose=prompt_prose if prompt_prose else None,
recent_dialogue=recent,
budget_soft=settings.narrative_budget_soft,
budget_hard=settings.narrative_budget_hard,
guest_id=guest_bot_id,
)
# 6. Stream and accumulate tokens. The stream runs as a Task so the
# /turns/cancel route can invoke ``Task.cancel()`` to abort it
# mid-stream. ``accumulated`` is a closure over the inner coroutine,
# so when the await on ``stream_task`` raises CancelledError below
# we still see whatever tokens were appended before cancellation.
primary_accumulated: list[str] = []
primary_truncated = False
cancelled = False
async def _stream_primary() -> None:
async for chunk in client.stream(
messages,
model=settings.narrative_model,
max_tokens=settings.narrative_max_tokens,
temperature=settings.narrative_temperature,
):
primary_accumulated.append(chunk)
await publish(
chat_id,
{
"event": "token",
"text": chunk,
"speaker_id": addressee_bot["id"],
},
)
stream_task = asyncio.create_task(_stream_primary())
_in_flight_tasks[chat_id] = stream_task
try:
await stream_task
except asyncio.CancelledError:
# Preserve the partial output before letting the cancellation
# propagate so the transcript reflects what the user actually saw.
primary_truncated = True
cancelled = True
except Exception:
# Surface as a truncated turn rather than losing the partial output.
primary_truncated = True
finally:
# Always unregister so a subsequent turn can register a fresh task.
_in_flight_tasks.pop(chat_id, None)
primary_text = "".join(primary_accumulated)
# 7. Append the assistant_turn with the final text. (See note above on
# why we skip ``project`` for these transcript-only event kinds.)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": chat_id,
"speaker_id": addressee_bot["id"],
"text": primary_text,
"truncated": primary_truncated,
"user_turn_id": user_turn_event_id,
},
)
# 7a. Per-turn memory write (Plan §11.1, T21 / T41). With a guest
# present this fans out to one ``memory_written`` event per witness
# (host + guest); without a guest it preserves the Phase 1 single
# write keyed on the host. Witness flags are set inside the helper.
scene = active_scene(conn, chat_id)
memory_results = record_turn_memory_for_present(
conn,
chat_id=chat_id,
host_bot_id=host_bot["id"],
guest_bot_id=guest_bot_id,
narrative_text=primary_text,
scene_id=scene["id"] if scene else None,
chat_clock_at=chat.get("time"),
)
# 7b. Post-turn state-update pass (Requirements §3.4 / T40). All
# directed pairs over the present entities — 2 pairs for 1:1, 6 for
# 3-entity scenes. Run sequentially via the inner helper which honors
# the Featherless 2-conn cap.
you_entity = get_you(conn) or {"name": "you", "persona": ""}
last_at = chat.get("time")
recent_for_update = _read_recent_dialogue(conn, chat_id, limit=10)
present_ids, present_names, personas, prior_edges = (
_gather_state_update_inputs(
conn,
host_bot=host_bot,
guest_bot=guest_bot,
you_entity=you_entity,
)
)
state_updates = await compute_state_updates_for_present(
client,
classifier_model=settings.classifier_model,
present_ids=present_ids,
present_names=present_names,
personas=personas,
prior_edges=prior_edges,
recent_dialogue=recent_for_update,
timeout_s=settings.classifier_timeout_s,
)
for src_id, tgt_id, update in state_updates:
append_and_apply(
conn,
kind="edge_update",
payload={
"source_id": src_id,
"target_id": tgt_id,
"chat_id": chat_id,
"affinity_delta": update.affinity_delta,
"trust_delta": update.trust_delta,
"knowledge_facts": update.knowledge_facts,
"last_interaction_at": last_at,
"last_interaction_chat_id": chat_id,
},
)
# 7c. Enqueue the async significance pass (Plan §11.1, T22). The
# worker scores the just-written memory 0-3, updates significance,
# and auto-pins on score 3 with the §8.5 soft-cap eviction rule.
# Phase 2 picks the host's memory id as the canonical input — guest
# POV memories piggyback on the same significance score (the prose
# they record is identical for v2; per-POV rewrite happens at scene
# close in T45 and downstream-of-significance).
worker = getattr(request.app.state, "background_worker", None)
host_event_memory = memory_results.get(host_bot["id"])
host_memory_id = host_event_memory[1] if host_event_memory else None
if worker is not None and host_memory_id is not None:
worker.enqueue(
SignificanceJob(
memory_id=host_memory_id,
narrative_text=primary_text,
prior_dialogue=recent_for_update,
host_bot_id=host_bot["id"],
)
)
# 8. Interjection branch (T39 / T44). Only fires when the chat has a
# guest AND the addressee was the bot we *can* interject for (i.e.
# not the lone bot in a 1:1 chat). The silent witness is whichever
# bot didn't get the addressee slot. We only run this when the
# primary stream actually completed — a cancelled or errored primary
# short-circuits the follow-on so we don't classifier-spam against a
# half-formed beat.
interjection_text: str | None = None
interjection_speaker_id: str | None = None
interjection_truncated = False
if (
guest_bot is not None
and not cancelled
and not primary_truncated
and primary_text.strip()
):
# Identify the silent witness — the bot that is NOT the addressee.
if addressee_id == host_bot["id"]:
silent_witness = guest_bot
else:
silent_witness = host_bot
edge_w_to_addr = get_edge(
conn, silent_witness["id"], addressee_bot["id"]
) or {"affinity": 50, "trust": 50, "summary": ""}
edge_w_to_you = get_edge(conn, silent_witness["id"], "you") or {
"affinity": 50,
"trust": 50,
"summary": "",
}
decision = await detect_interjection(
client,
classifier_model=settings.classifier_model,
addressee_name=addressee_bot["name"],
addressee_just_said=primary_text,
silent_witness_name=silent_witness["name"],
silent_witness_persona=silent_witness.get("persona") or "",
silent_witness_edge_to_addressee=edge_w_to_addr,
silent_witness_edge_to_you=edge_w_to_you,
you_just_said=prose,
timeout_s=settings.classifier_timeout_s,
)
if decision.should_interject:
interjection_speaker_id = silent_witness["id"]
# Re-read recent_dialogue so the just-appended assistant_turn
# (the addressee's beat) is in the prompt context.
interject_recent = _read_recent_dialogue(conn, chat_id, limit=20)
if interject_recent and interject_recent[-1].get("speaker") == "you":
interject_recent = interject_recent[:-1]
interject_messages = assemble_narrative_prompt(
conn,
chat_id=chat_id,
speaker_bot_id=silent_witness["id"],
addressee=addressee_bot["id"],
user_turn_prose=prompt_prose if prompt_prose else None,
recent_dialogue=interject_recent,
budget_soft=settings.narrative_budget_soft,
budget_hard=settings.narrative_budget_hard,
guest_id=guest_bot_id,
)
interject_accumulated: list[str] = []
async def _stream_interjection() -> None:
async for chunk in client.stream(
interject_messages,
model=settings.narrative_model,
max_tokens=settings.narrative_max_tokens,
temperature=settings.narrative_temperature,
):
interject_accumulated.append(chunk)
await publish(
chat_id,
{
"event": "token",
"text": chunk,
"speaker_id": silent_witness["id"],
},
)
interject_task = asyncio.create_task(_stream_interjection())
_in_flight_tasks[chat_id] = interject_task
try:
await interject_task
except asyncio.CancelledError:
interjection_truncated = True
cancelled = True
except Exception:
interjection_truncated = True
finally:
_in_flight_tasks.pop(chat_id, None)
interjection_text = "".join(interject_accumulated)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": chat_id,
"speaker_id": silent_witness["id"],
"text": interjection_text,
"truncated": interjection_truncated,
"user_turn_id": user_turn_event_id,
"interjection_of": addressee_bot["id"],
},
)
# Skip the downstream classifier passes if the interjection
# was cancelled mid-stream — we don't want to score a partial
# beat the user never got to read in full.
if not interjection_truncated:
# Re-run the multi-pair state update — the interjector
# adding their voice plausibly shifts edges for everyone
# in the room. Idempotent enough for v2 (deltas accumulate;
# no stale state). Re-read recent so the just-appended
# interjection turn is in scope.
recent_post_interject = _read_recent_dialogue(
conn, chat_id, limit=10
)
# Re-fetch prior edges so deltas land on the post-primary
# state rather than the pre-turn baseline.
_, _, _, prior_edges_post = _gather_state_update_inputs(
conn,
host_bot=host_bot,
guest_bot=guest_bot,
you_entity=you_entity,
)
state_updates_post = await compute_state_updates_for_present(
client,
classifier_model=settings.classifier_model,
present_ids=present_ids,
present_names=present_names,
personas=personas,
prior_edges=prior_edges_post,
recent_dialogue=recent_post_interject,
timeout_s=settings.classifier_timeout_s,
)
for src_id, tgt_id, update in state_updates_post:
append_and_apply(
conn,
kind="edge_update",
payload={
"source_id": src_id,
"target_id": tgt_id,
"chat_id": chat_id,
"affinity_delta": update.affinity_delta,
"trust_delta": update.trust_delta,
"knowledge_facts": update.knowledge_facts,
"last_interaction_at": last_at,
"last_interaction_chat_id": chat_id,
},
)
# Memory write for the interjection beat — a second pair
# of memory_written events (host + guest POVs).
record_turn_memory_for_present(
conn,
chat_id=chat_id,
host_bot_id=host_bot["id"],
guest_bot_id=guest_bot_id,
narrative_text=interjection_text,
scene_id=scene["id"] if scene else None,
chat_clock_at=chat.get("time"),
)
# 9. Scene-close detection (Plan §7.2, T26). Runs AFTER assistant_turn
# and the optional interjection so the bots' responses are part of
# the closing scene's final beat — closing before narrative would
# force the bot to speak "in no scene", which is awkward. Hard
# signals only in Phase 1: container change parsed from prose, or
# explicit "fade out" / "we're done here" patterns. On classifier
# failure the service returns ``should_close=False`` so the turn
# flow keeps moving; the manual close button in the drawer is the
# always-available fallback.
#
# Skip empty prose — no signal to classify and no point spending a
# round-trip. Skip when there's no active scene (e.g. after a prior
# close in the same chat) — we have nothing to close. T13 (kickoff)
# is the only scene-opener path in v1; Phase 2-3 will handle
# automatic re-opening with the next container.
if scene is not None and prose.strip():
container = None
if scene.get("container_id") is not None:
container = get_container(conn, scene["container_id"])
container_name = container["name"] if container else "unknown"
decision = await detect_scene_close(
client,
model=settings.classifier_model,
prose=prose,
current_container_name=container_name,
)
if decision.should_close:
append_and_apply(
conn,
kind="scene_closed",
payload={
"scene_id": scene["id"],
"ended_at": chat.get("time"),
# T27 promotes the per-POV summary into ``edges.summary``
# but doesn't currently set scene significance — the
# async significance pass (T22) operates on memories.
"significance": 0,
},
)
# T27 / T45: per-POV summary + edge summary update + knowledge
# promotion for each present witness (host always; guest when
# present). Runs synchronously after the close so the next
# turn (or a subsequent GET /chats/<id>) sees the rewritten
# memories and edge summaries. Tolerates classifier failure
# (returns the empty default and skips the writes).
await apply_scene_close_summary(
conn,
client,
classifier_model=settings.classifier_model,
chat_id=chat_id,
scene_id=scene["id"],
host_bot_id=host_bot["id"],
timeout_s=settings.classifier_timeout_s,
)
# 10. Broadcast a JSON completion event (for JS consumers) and an HTML
# fragment event (for HTMX SSE swap-into-timeline). One pair per
# written assistant_turn so the timeline ends up with both the
# primary and the interjection beat in the right order.
await publish(
chat_id,
{
"event": "assistant_turn_complete",
"speaker_id": addressee_bot["id"],
"text": primary_text,
"truncated": primary_truncated,
},
)
primary_html = _render_turn_html(
addressee_bot["name"], primary_text, role="bot"
)
await publish(
chat_id, {"event": "turn_html", "data": primary_html}
)
if interjection_text is not None and interjection_speaker_id is not None:
# The interjector's display name is whichever bot wasn't the
# addressee — pull it from the in-scope variable directly.
interject_speaker_name = (
host_bot["name"]
if interjection_speaker_id == host_bot["id"]
else (guest_bot["name"] if guest_bot is not None else "bot")
)
await publish(
chat_id,
{
"event": "assistant_turn_complete",
"speaker_id": interjection_speaker_id,
"text": interjection_text,
"truncated": interjection_truncated,
},
)
interject_html = _render_turn_html(
interject_speaker_name, interjection_text, role="bot"
)
await publish(
chat_id, {"event": "turn_html", "data": interject_html}
)
if cancelled:
# Re-raise after the partial-turn has been recorded.
raise asyncio.CancelledError
return Response(status_code=204)
# ---------------------------------------------------------------------------
# Cancel route (Task 34).
#
# Fire-and-forget: the Stop button POSTs here, we mark the in-flight
# streaming Task as cancelled, and return 204 immediately. The cancel
# propagates into the streaming coroutine on its next await, the
# CancelledError handler in ``post_turn`` catches it, and the partial
# is committed with ``truncated=True``. No body is needed — the SSE
# channel is the conveyor of state. If no turn is in flight (or the
# task already completed), we 204 silently so the client can fire the
# Stop button without a precondition check.
# ---------------------------------------------------------------------------
@router.post("/chats/{chat_id}/turns/cancel")
async def cancel_turn(chat_id: str, request: Request):
task = _in_flight_tasks.get(chat_id)
if task is None or task.done():
return Response(status_code=204)
task.cancel()
return Response(status_code=204)
# ---------------------------------------------------------------------------
# Rewind routes (Task 28).
#
# Two endpoints: a GET that renders the impact-preview modal, and a POST
# that actually executes the rewind. The execution path opens its own
# database connection because the route's ``conn`` is closed when the
# dependency-injection scope exits — passing it to ``execute_rewind``
# would dangle.
# ---------------------------------------------------------------------------
@router.get(
"/chats/{chat_id}/rewind/preview/{event_id}",
response_class=HTMLResponse,
)
async def rewind_preview(
chat_id: str,
event_id: int,
request: Request,
conn=Depends(get_conn),
):
"""Render the rewind impact-preview modal as a small HTML fragment.
The HTMX form inside the fragment posts to the execute endpoint
below. v1 keeps the markup minimal — Task 35 polishes the modal.
"""
chat = get_chat(conn, chat_id)
if chat is None:
raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}")
preview = compute_rewind_preview(conn, event_id)
items = "".join(
f"<li>{count} × {html.escape(kind)}</li>"
for kind, count in preview["by_kind"].items()
)
body = (
"<div class='rewind-modal'>"
f"<h3>Rewind to event {event_id}?</h3>"
f"<p>This will remove {preview['total_events']} events:</p>"
f"<ul>{items}</ul>"
f"<form hx-post='/chats/{html.escape(chat_id)}/rewind/{event_id}' "
"hx-target='body' hx-swap='innerHTML'>"
"<button type='submit'>Confirm Rewind</button>"
"</form>"
"</div>"
)
return HTMLResponse(body)
# ---------------------------------------------------------------------------
# Regenerate route (Task 29).
#
# A POST that re-streams the most recent assistant turn. The prior
# ``assistant_turn`` event is kept in the log but flagged
# ``superseded_by`` so the timeline filter in :func:`_read_recent_dialogue`
# hides it. When the user supplies ``prose`` the original ``user_turn``
# is also superseded by a fresh ``user_turn_edit`` event capturing the
# edit. Significance is *not* re-run on regenerate (per plan §11.1) but
# state-update + memory writes are.
# ---------------------------------------------------------------------------
@router.post("/chats/{chat_id}/turns/{event_id}/regenerate")
async def regenerate_turn(
chat_id: str,
event_id: int,
request: Request,
prose: str | None = Form(None),
conn=Depends(get_conn),
client=Depends(get_llm_client),
):
"""Regenerate the assistant turn referenced by ``event_id``.
``prose`` is optional. When provided (and non-empty) we capture a
``user_turn_edit`` event before re-streaming. Returns 204 on
success, 404 when the chat or assistant_turn event is missing. The
SSE channel emits per-token events as the new text arrives.
"""
chat = get_chat(conn, chat_id)
if chat is None:
raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}")
settings = request.app.state.settings
# Local import keeps the module import graph flat (the service
# imports from ``state`` / ``services`` siblings already).
from chat.services.regenerate import regenerate_assistant_turn
edited_prose = prose if prose else None
try:
await regenerate_assistant_turn(
conn,
client,
settings=settings,
chat_id=chat_id,
original_assistant_event_id=event_id,
edited_user_prose=edited_prose,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
return Response(status_code=204)
@router.post("/chats/{chat_id}/rewind/{event_id}")
async def rewind_execute(
chat_id: str,
event_id: int,
request: Request,
conn=Depends(get_conn),
):
"""Execute the rewind: snapshot, truncate event_log, re-project.
Note: ``conn`` is only used to validate the chat exists. The actual
rewind opens its own connection inside ``execute_rewind`` because
we need it to commit independently and survive the route's
dependency teardown.
"""
chat = get_chat(conn, chat_id)
if chat is None:
raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}")
settings = request.app.state.settings
execute_rewind(
db_path=settings.db_path,
data_dir=settings.data_dir,
after_event_id=event_id,
)
return RedirectResponse(url=f"/chats/{chat_id}", status_code=303)