992 lines
41 KiB
Python
992 lines
41 KiB
Python
"""POST ``/chats/<id>/turns`` — narrative turn flow with SSE streaming.
|
||
|
||
The turn flow strings together the pieces built in T17 (turn parser), T18
|
||
(prompt assembler), and T16 (SSE channel). Phase 2 (T44) extends it to
|
||
multi-entity scenes with optional guest support and a follow-on
|
||
interjection beat.
|
||
|
||
1. Parse the user's prose with the classifier into typed segments.
|
||
2. Append a ``user_turn`` event capturing both the original prose and the
|
||
parsed segments.
|
||
3. Append a placeholder ``assistant_turn_started`` marker so observers know
|
||
a response is in flight.
|
||
4. Detect the addressee (host vs. guest) from the prose using a simple
|
||
word-boundary substring match — see :func:`_detect_addressee_id`.
|
||
5. Build the narrative prompt for the addressee, dropping OOC segments
|
||
before they reach the bot (per Requirements §6.1 the OOC convention is
|
||
for the author to talk to the system, not to the in-fiction bot).
|
||
6. Stream tokens from the LLM, broadcasting each chunk over the chat's SSE
|
||
channel as a ``token`` event so any subscribed browser tab sees them
|
||
arrive in real time.
|
||
7. On stream complete, append an ``assistant_turn`` event with the full
|
||
text and ``truncated=False``. Then run a post-turn state-update pass
|
||
(Requirements §3.4): one classifier call per directed edge between
|
||
present entities, each producing an ``edge_update`` event with
|
||
affinity/trust/knowledge deltas.
|
||
8. When a guest is present, run the interjection classifier (§6.2). If it
|
||
fires we stream a second narrative as the silent witness, append a
|
||
second ``assistant_turn`` event linked to the same ``user_turn_id``,
|
||
and re-run memory + state-update for the interjector. The same
|
||
in-flight task covers both halves so cancel collapses both.
|
||
9. Scene-close detection runs after the (primary + optional interjection)
|
||
beats land so the close summary sees the full closing scene. T45's
|
||
guest-aware ``apply_scene_close_summary`` writes per-POV summaries for
|
||
each present witness.
|
||
10. Publish a ``turn_html`` event for each turn so HTMX's SSE extension
|
||
can append it to the timeline without a page reload.
|
||
11. Return ``204 No Content`` — the SSE channel is the real conveyor of
|
||
state, not the POST response body.
|
||
|
||
Errors during streaming flip the assistant_turn's ``truncated`` flag to
|
||
``True`` and we still commit what we received. ``asyncio.CancelledError``
|
||
is treated identically and re-raised after recording the partial turn.
|
||
A cancellation mid-interjection skips the interjector's state/memory
|
||
follow-up so we don't run classifiers against a half-formed beat.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import html
|
||
import json
|
||
import re
|
||
|
||
from fastapi import APIRouter, Depends, Form, HTTPException, Request
|
||
from fastapi.responses import HTMLResponse, RedirectResponse, Response
|
||
|
||
from chat.eventlog.log import append_and_apply, append_event
|
||
from chat.services.addressee import detect_addressee
|
||
from chat.services.background import SignificanceJob
|
||
from chat.services.event_lifecycle import detect_event_transitions
|
||
from chat.services.event_promotion import promote_completed_event
|
||
from chat.services.interjection import detect_interjection
|
||
from chat.services.memory_write import record_turn_memory_for_present
|
||
from chat.services.multi_state_update import compute_state_updates_for_present
|
||
from chat.services.prompt import assemble_narrative_prompt
|
||
from chat.services.rewind import compute_rewind_preview, execute_rewind
|
||
from chat.services.scene_close import detect_scene_close
|
||
from chat.services.scene_summarize import apply_scene_close_summary
|
||
from chat.services.turn_parse import ParsedTurn, parse_turn
|
||
from chat.state.edges import get_edge
|
||
from chat.state.entities import get_bot, get_you
|
||
from chat.state.events import list_active_events
|
||
from chat.state.world import active_scene, get_chat, get_container
|
||
from chat.web.bots import get_conn
|
||
from chat.web.kickoff import get_llm_client
|
||
from chat.web.pubsub import publish
|
||
from chat.web.render import render_turn_html as _render_turn_html
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
# Module-level registry of in-flight streaming tasks, keyed by chat_id.
|
||
# The POST /chats/<id>/turns/cancel route looks up the task and calls
|
||
# .cancel(); the streaming coroutine in post_turn catches the resulting
|
||
# CancelledError, commits the partial as truncated, and unregisters.
|
||
# Single-process v1 only — sufficient for one user with multiple tabs.
|
||
_in_flight_tasks: dict[str, asyncio.Task] = {}
|
||
|
||
|
||
def _strip_ooc_for_prompt(parsed: ParsedTurn) -> str:
|
||
"""Concatenate non-OOC segments back to a prose string for the prompt.
|
||
|
||
OOC segments (``((double parens))``) are kept in the user_turn payload
|
||
for transcript display but stripped before assembly so the bot never
|
||
sees author-to-system messages.
|
||
"""
|
||
keep = [s.text for s in parsed.segments if s.kind != "ooc"]
|
||
return " ".join(keep).strip()
|
||
|
||
|
||
def _read_recent_dialogue(conn, chat_id: str, limit: int = 200) -> list[dict]:
|
||
"""Return user-side and assistant_turn events for ``chat_id``.
|
||
|
||
Includes ``user_turn``, ``user_turn_edit`` (T29 edited prose), and
|
||
``assistant_turn``. Ordered oldest-first; superseded/hidden rows are
|
||
skipped so regenerated turns (T29) drop out of the rendered timeline.
|
||
Each entry is shaped ``{"speaker": <id-or-"you">, "text": <prose>}``
|
||
for the prompt assembler and the chat-detail template.
|
||
"""
|
||
cur = conn.execute(
|
||
"SELECT id, kind, payload_json FROM event_log "
|
||
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
|
||
" AND superseded_by IS NULL AND hidden = 0 "
|
||
"ORDER BY id DESC LIMIT ?",
|
||
(limit,),
|
||
)
|
||
rows = cur.fetchall()
|
||
rows.reverse() # back to chronological order
|
||
out: list[dict] = []
|
||
for _row_id, kind, payload_json in rows:
|
||
p = json.loads(payload_json)
|
||
if p.get("chat_id") != chat_id:
|
||
continue
|
||
if kind in ("user_turn", "user_turn_edit"):
|
||
# Edited prose substitutes for the original user_turn (the
|
||
# original is marked superseded_by and filtered above).
|
||
out.append({"speaker": "you", "text": p.get("prose", "")})
|
||
else:
|
||
out.append(
|
||
{
|
||
"speaker": p.get("speaker_id", "bot"),
|
||
"text": p.get("text", ""),
|
||
}
|
||
)
|
||
return out
|
||
|
||
|
||
def _detect_addressee_id(
|
||
prose: str, host_bot: dict, guest_bot: dict | None
|
||
) -> str:
|
||
"""Return the bot id of the addressee for ``prose``.
|
||
|
||
Phase 2 v1 uses a simple case-insensitive whole-word match. The host
|
||
is the default — addressee flips to guest only when the guest's name
|
||
appears in the prose AND the host's does not. If both names match
|
||
or neither matches, the host keeps the floor. This bias keeps the
|
||
primary speaker stable across ambiguous prose; the interjection
|
||
branch (later in the turn flow) is how the silent witness gets a word
|
||
in edgewise when warranted.
|
||
"""
|
||
if guest_bot is None:
|
||
return host_bot["id"]
|
||
host_name = host_bot.get("name") or ""
|
||
guest_name = guest_bot.get("name") or ""
|
||
host_match = bool(
|
||
host_name
|
||
and re.search(rf"\b{re.escape(host_name)}\b", prose, re.IGNORECASE)
|
||
)
|
||
guest_match = bool(
|
||
guest_name
|
||
and re.search(rf"\b{re.escape(guest_name)}\b", prose, re.IGNORECASE)
|
||
)
|
||
if guest_match and not host_match:
|
||
return guest_bot["id"]
|
||
return host_bot["id"]
|
||
|
||
|
||
def _gather_state_update_inputs(
|
||
conn,
|
||
*,
|
||
host_bot: dict,
|
||
guest_bot: dict | None,
|
||
you_entity: dict,
|
||
) -> tuple[list[str], dict[str, str], dict[str, str], dict[tuple[str, str], dict]]:
|
||
"""Collect ``(present_ids, present_names, personas, prior_edges)`` for
|
||
a multi-entity state-update pass.
|
||
|
||
Phase 2 v1 always pairs ``you`` with the host and (when present) the
|
||
guest. ``prior_edges`` falls back to the schema default 50/50 baseline
|
||
when no row exists yet — that mirrors the Phase 1 single-pair flow.
|
||
|
||
Order matters: the host comes first so the directed-pair iteration
|
||
in :func:`compute_state_updates_for_present` matches the Phase 1
|
||
sequence (host->you, then you->host). Existing tests pin the canned-
|
||
response queue to that order — keeping it stable means we don't
|
||
have to reshuffle test fixtures across the Phase 2 cutover.
|
||
"""
|
||
present_ids: list[str] = [host_bot["id"], "you"]
|
||
present_names: dict[str, str] = {
|
||
host_bot["id"]: host_bot["name"],
|
||
"you": you_entity.get("name") or "you",
|
||
}
|
||
personas: dict[str, str] = {
|
||
host_bot["id"]: host_bot.get("persona") or "",
|
||
"you": you_entity.get("persona") or "",
|
||
}
|
||
if guest_bot is not None:
|
||
present_ids.append(guest_bot["id"])
|
||
present_names[guest_bot["id"]] = guest_bot["name"]
|
||
personas[guest_bot["id"]] = guest_bot.get("persona") or ""
|
||
|
||
prior_edges: dict[tuple[str, str], dict] = {}
|
||
for src in present_ids:
|
||
for tgt in present_ids:
|
||
if src == tgt:
|
||
continue
|
||
edge = get_edge(conn, src, tgt) or {
|
||
"affinity": 50,
|
||
"trust": 50,
|
||
"summary": "",
|
||
}
|
||
prior_edges[(src, tgt)] = edge
|
||
return present_ids, present_names, personas, prior_edges
|
||
|
||
|
||
@router.post("/chats/{chat_id}/turns")
|
||
async def post_turn(
|
||
chat_id: str,
|
||
request: Request,
|
||
prose: str = Form(""),
|
||
conn=Depends(get_conn),
|
||
client=Depends(get_llm_client),
|
||
):
|
||
if not prose.strip():
|
||
raise HTTPException(status_code=400, detail="prose cannot be empty")
|
||
|
||
chat = get_chat(conn, chat_id)
|
||
if chat is None:
|
||
raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}")
|
||
|
||
host_bot = get_bot(conn, chat["host_bot_id"])
|
||
if host_bot is None:
|
||
# Defensive: chat row references a missing bot.
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"host bot not found: {chat['host_bot_id']}",
|
||
)
|
||
|
||
guest_bot = None
|
||
guest_bot_id = chat.get("guest_bot_id")
|
||
if guest_bot_id is not None:
|
||
# T47's bot_reset cascade clears guest_bot_id from any chat that
|
||
# referenced the deleted bot, so by the time we read it here it's
|
||
# either None or a live bot id. The previous defensive
|
||
# degrade-to-1:1 block (T44) was rendered dead by T47 and removed
|
||
# in T74.4 — get_bot now returns a real row.
|
||
guest_bot = get_bot(conn, guest_bot_id)
|
||
|
||
settings = request.app.state.settings
|
||
|
||
# 1. Parse turn (classifier).
|
||
parsed = await parse_turn(
|
||
client, model=settings.classifier_model, prose=prose
|
||
)
|
||
prompt_prose = _strip_ooc_for_prompt(parsed)
|
||
|
||
# 2. Append user_turn event.
|
||
user_turn_event_id = append_event(
|
||
conn,
|
||
kind="user_turn",
|
||
payload={
|
||
"chat_id": chat_id,
|
||
"prose": prose,
|
||
"segments": [s.model_dump() for s in parsed.segments],
|
||
},
|
||
)
|
||
|
||
# 3. Determine the addressee. Done before assistant_turn_started so the
|
||
# placeholder reflects the bot the user is actually talking to (host
|
||
# in 1:1, host-or-guest in multi-entity). T74.1 routes the multi-entity
|
||
# case through the addressee classifier; the no-guest case still uses
|
||
# the substring fast-path because there is nothing to classify when
|
||
# only one bot is present (and a classifier round-trip there would
|
||
# just be throughput overhead).
|
||
if guest_bot is None:
|
||
addressee_id = _detect_addressee_id(prose, host_bot, guest_bot)
|
||
else:
|
||
decision = await detect_addressee(
|
||
client,
|
||
classifier_model=settings.classifier_model,
|
||
user_prose=prose,
|
||
host_id=host_bot["id"],
|
||
host_name=host_bot["name"],
|
||
guest_id=guest_bot["id"],
|
||
guest_name=guest_bot["name"],
|
||
timeout_s=settings.classifier_timeout_s,
|
||
)
|
||
addressee_id = decision.addressee_id
|
||
addressee_bot = (
|
||
guest_bot if (guest_bot is not None and addressee_id == guest_bot["id"])
|
||
else host_bot
|
||
)
|
||
|
||
# 4. Append assistant_turn_started placeholder. ``user_turn``,
|
||
# ``assistant_turn_started``, and ``assistant_turn`` have no registered
|
||
# projector handlers — they live in the event_log purely for transcript
|
||
# rendering — so we don't call ``project`` here. (Re-projecting now would
|
||
# also re-run prior non-idempotent inserts like ``chat_created``.)
|
||
append_event(
|
||
conn,
|
||
kind="assistant_turn_started",
|
||
payload={
|
||
"chat_id": chat_id,
|
||
"speaker_id": addressee_bot["id"],
|
||
"user_turn_id": user_turn_event_id,
|
||
},
|
||
)
|
||
|
||
# 5. Build the narrative prompt for the addressee. ``guest_id`` is
|
||
# passed explicitly so the prompt assembler renders the guest's
|
||
# activity / group-node block when applicable. The assembler is
|
||
# tolerant of ``guest_id is None`` so this is a no-op for 1:1 chats.
|
||
recent = _read_recent_dialogue(conn, chat_id, limit=20)
|
||
# Drop the just-appended user turn from ``recent`` — it's passed as
|
||
# ``user_turn_prose`` to the assembler and would otherwise duplicate.
|
||
if recent and recent[-1].get("speaker") == "you":
|
||
recent = recent[:-1]
|
||
messages = assemble_narrative_prompt(
|
||
conn,
|
||
chat_id=chat_id,
|
||
speaker_bot_id=addressee_bot["id"],
|
||
user_turn_prose=prompt_prose if prompt_prose else None,
|
||
recent_dialogue=recent,
|
||
budget_soft=settings.narrative_budget_soft,
|
||
budget_hard=settings.narrative_budget_hard,
|
||
guest_id=guest_bot_id,
|
||
)
|
||
|
||
# 6. Stream and accumulate tokens. The stream runs as a Task so the
|
||
# /turns/cancel route can invoke ``Task.cancel()`` to abort it
|
||
# mid-stream. ``accumulated`` is a closure over the inner coroutine,
|
||
# so when the await on ``stream_task`` raises CancelledError below
|
||
# we still see whatever tokens were appended before cancellation.
|
||
primary_accumulated: list[str] = []
|
||
primary_truncated = False
|
||
cancelled = False
|
||
|
||
async def _stream_primary() -> None:
|
||
async for chunk in client.stream(
|
||
messages,
|
||
model=settings.narrative_model,
|
||
max_tokens=settings.narrative_max_tokens,
|
||
temperature=settings.narrative_temperature,
|
||
):
|
||
primary_accumulated.append(chunk)
|
||
await publish(
|
||
chat_id,
|
||
{
|
||
"event": "token",
|
||
"text": chunk,
|
||
"speaker_id": addressee_bot["id"],
|
||
},
|
||
)
|
||
|
||
stream_task = asyncio.create_task(_stream_primary())
|
||
_in_flight_tasks[chat_id] = stream_task
|
||
try:
|
||
await stream_task
|
||
except asyncio.CancelledError:
|
||
# Preserve the partial output before letting the cancellation
|
||
# propagate so the transcript reflects what the user actually saw.
|
||
primary_truncated = True
|
||
cancelled = True
|
||
except Exception:
|
||
# Surface as a truncated turn rather than losing the partial output.
|
||
primary_truncated = True
|
||
finally:
|
||
# Always unregister so a subsequent turn can register a fresh task.
|
||
_in_flight_tasks.pop(chat_id, None)
|
||
|
||
primary_text = "".join(primary_accumulated)
|
||
|
||
# 7. Append the assistant_turn with the final text. (See note above on
|
||
# why we skip ``project`` for these transcript-only event kinds.)
|
||
append_event(
|
||
conn,
|
||
kind="assistant_turn",
|
||
payload={
|
||
"chat_id": chat_id,
|
||
"speaker_id": addressee_bot["id"],
|
||
"text": primary_text,
|
||
"truncated": primary_truncated,
|
||
"user_turn_id": user_turn_event_id,
|
||
},
|
||
)
|
||
|
||
# 7a. Per-turn memory write (Plan §11.1, T21 / T41). With a guest
|
||
# present this fans out to one ``memory_written`` event per witness
|
||
# (host + guest); without a guest it preserves the Phase 1 single
|
||
# write keyed on the host. Witness flags are set inside the helper.
|
||
scene = active_scene(conn, chat_id)
|
||
memory_results = record_turn_memory_for_present(
|
||
conn,
|
||
chat_id=chat_id,
|
||
host_bot_id=host_bot["id"],
|
||
guest_bot_id=guest_bot_id,
|
||
narrative_text=primary_text,
|
||
scene_id=scene["id"] if scene else None,
|
||
chat_clock_at=chat.get("time"),
|
||
)
|
||
|
||
# 7b. Post-turn state-update pass (Requirements §3.4 / T40). All
|
||
# directed pairs over the present entities — 2 pairs for 1:1, 6 for
|
||
# 3-entity scenes. Run sequentially via the inner helper which honors
|
||
# the Featherless 2-conn cap.
|
||
you_entity = get_you(conn) or {"name": "you", "persona": ""}
|
||
last_at = chat.get("time")
|
||
recent_for_update = _read_recent_dialogue(conn, chat_id, limit=10)
|
||
|
||
present_ids, present_names, personas, prior_edges = (
|
||
_gather_state_update_inputs(
|
||
conn,
|
||
host_bot=host_bot,
|
||
guest_bot=guest_bot,
|
||
you_entity=you_entity,
|
||
)
|
||
)
|
||
|
||
state_updates = await compute_state_updates_for_present(
|
||
client,
|
||
classifier_model=settings.classifier_model,
|
||
present_ids=present_ids,
|
||
present_names=present_names,
|
||
personas=personas,
|
||
prior_edges=prior_edges,
|
||
recent_dialogue=recent_for_update,
|
||
timeout_s=settings.classifier_timeout_s,
|
||
)
|
||
for src_id, tgt_id, update in state_updates:
|
||
append_and_apply(
|
||
conn,
|
||
kind="edge_update",
|
||
payload={
|
||
"source_id": src_id,
|
||
"target_id": tgt_id,
|
||
"chat_id": chat_id,
|
||
"affinity_delta": update.affinity_delta,
|
||
"trust_delta": update.trust_delta,
|
||
"knowledge_facts": update.knowledge_facts,
|
||
"last_interaction_at": last_at,
|
||
"last_interaction_chat_id": chat_id,
|
||
},
|
||
)
|
||
|
||
# 7c. Enqueue the async significance pass (Plan §11.1, T22). The
|
||
# worker scores the just-written memory 0-3, updates significance,
|
||
# and auto-pins on score 3 with the §8.5 soft-cap eviction rule.
|
||
# Phase 2 picks the host's memory id as the canonical input — guest
|
||
# POV memories piggyback on the same significance score (the prose
|
||
# they record is identical for v2; per-POV rewrite happens at scene
|
||
# close in T45 and downstream-of-significance).
|
||
worker = getattr(request.app.state, "background_worker", None)
|
||
host_event_memory = memory_results.get(host_bot["id"])
|
||
host_memory_id = host_event_memory[1] if host_event_memory else None
|
||
if worker is not None and host_memory_id is not None:
|
||
worker.enqueue(
|
||
SignificanceJob(
|
||
memory_id=host_memory_id,
|
||
narrative_text=primary_text,
|
||
prior_dialogue=recent_for_update,
|
||
host_bot_id=host_bot["id"],
|
||
)
|
||
)
|
||
|
||
# 8. Interjection branch (T39 / T44). Only fires when the chat has a
|
||
# guest AND the addressee was the bot we *can* interject for (i.e.
|
||
# not the lone bot in a 1:1 chat). The silent witness is whichever
|
||
# bot didn't get the addressee slot. We only run this when the
|
||
# primary stream actually completed — a cancelled or errored primary
|
||
# short-circuits the follow-on so we don't classifier-spam against a
|
||
# half-formed beat.
|
||
interjection_text: str | None = None
|
||
interjection_speaker_id: str | None = None
|
||
interjection_truncated = False
|
||
if (
|
||
guest_bot is not None
|
||
and not cancelled
|
||
and not primary_truncated
|
||
and primary_text.strip()
|
||
):
|
||
# Identify the silent witness — the bot that is NOT the addressee.
|
||
if addressee_id == host_bot["id"]:
|
||
silent_witness = guest_bot
|
||
else:
|
||
silent_witness = host_bot
|
||
|
||
edge_w_to_addr = get_edge(
|
||
conn, silent_witness["id"], addressee_bot["id"]
|
||
) or {"affinity": 50, "trust": 50, "summary": ""}
|
||
edge_w_to_you = get_edge(conn, silent_witness["id"], "you") or {
|
||
"affinity": 50,
|
||
"trust": 50,
|
||
"summary": "",
|
||
}
|
||
|
||
decision = await detect_interjection(
|
||
client,
|
||
classifier_model=settings.classifier_model,
|
||
addressee_name=addressee_bot["name"],
|
||
addressee_just_said=primary_text,
|
||
silent_witness_name=silent_witness["name"],
|
||
silent_witness_persona=silent_witness.get("persona") or "",
|
||
silent_witness_edge_to_addressee=edge_w_to_addr,
|
||
silent_witness_edge_to_you=edge_w_to_you,
|
||
you_just_said=prose,
|
||
timeout_s=settings.classifier_timeout_s,
|
||
)
|
||
|
||
if decision.should_interject:
|
||
interjection_speaker_id = silent_witness["id"]
|
||
|
||
# Re-read recent_dialogue so the just-appended assistant_turn
|
||
# (the addressee's beat) is in the prompt context.
|
||
interject_recent = _read_recent_dialogue(conn, chat_id, limit=20)
|
||
if interject_recent and interject_recent[-1].get("speaker") == "you":
|
||
interject_recent = interject_recent[:-1]
|
||
interject_messages = assemble_narrative_prompt(
|
||
conn,
|
||
chat_id=chat_id,
|
||
speaker_bot_id=silent_witness["id"],
|
||
addressee=addressee_bot["id"],
|
||
user_turn_prose=prompt_prose if prompt_prose else None,
|
||
recent_dialogue=interject_recent,
|
||
budget_soft=settings.narrative_budget_soft,
|
||
budget_hard=settings.narrative_budget_hard,
|
||
guest_id=guest_bot_id,
|
||
)
|
||
|
||
interject_accumulated: list[str] = []
|
||
|
||
async def _stream_interjection() -> None:
|
||
async for chunk in client.stream(
|
||
interject_messages,
|
||
model=settings.narrative_model,
|
||
max_tokens=settings.narrative_max_tokens,
|
||
temperature=settings.narrative_temperature,
|
||
):
|
||
interject_accumulated.append(chunk)
|
||
await publish(
|
||
chat_id,
|
||
{
|
||
"event": "token",
|
||
"text": chunk,
|
||
"speaker_id": silent_witness["id"],
|
||
},
|
||
)
|
||
|
||
interject_task = asyncio.create_task(_stream_interjection())
|
||
_in_flight_tasks[chat_id] = interject_task
|
||
try:
|
||
await interject_task
|
||
except asyncio.CancelledError:
|
||
interjection_truncated = True
|
||
cancelled = True
|
||
except Exception:
|
||
interjection_truncated = True
|
||
finally:
|
||
_in_flight_tasks.pop(chat_id, None)
|
||
|
||
interjection_text = "".join(interject_accumulated)
|
||
|
||
append_event(
|
||
conn,
|
||
kind="assistant_turn",
|
||
payload={
|
||
"chat_id": chat_id,
|
||
"speaker_id": silent_witness["id"],
|
||
"text": interjection_text,
|
||
"truncated": interjection_truncated,
|
||
"user_turn_id": user_turn_event_id,
|
||
"interjection_of": addressee_bot["id"],
|
||
},
|
||
)
|
||
|
||
# Skip the downstream classifier passes if the interjection
|
||
# was cancelled mid-stream — we don't want to score a partial
|
||
# beat the user never got to read in full.
|
||
if not interjection_truncated:
|
||
# Re-run the multi-pair state update — the interjector
|
||
# adding their voice plausibly shifts edges for everyone
|
||
# in the room. Idempotent enough for v2 (deltas accumulate;
|
||
# no stale state). Re-read recent so the just-appended
|
||
# interjection turn is in scope.
|
||
recent_post_interject = _read_recent_dialogue(
|
||
conn, chat_id, limit=10
|
||
)
|
||
# Re-fetch prior edges so deltas land on the post-primary
|
||
# state rather than the pre-turn baseline.
|
||
_, _, _, prior_edges_post = _gather_state_update_inputs(
|
||
conn,
|
||
host_bot=host_bot,
|
||
guest_bot=guest_bot,
|
||
you_entity=you_entity,
|
||
)
|
||
state_updates_post = await compute_state_updates_for_present(
|
||
client,
|
||
classifier_model=settings.classifier_model,
|
||
present_ids=present_ids,
|
||
present_names=present_names,
|
||
personas=personas,
|
||
prior_edges=prior_edges_post,
|
||
recent_dialogue=recent_post_interject,
|
||
timeout_s=settings.classifier_timeout_s,
|
||
)
|
||
for src_id, tgt_id, update in state_updates_post:
|
||
append_and_apply(
|
||
conn,
|
||
kind="edge_update",
|
||
payload={
|
||
"source_id": src_id,
|
||
"target_id": tgt_id,
|
||
"chat_id": chat_id,
|
||
"affinity_delta": update.affinity_delta,
|
||
"trust_delta": update.trust_delta,
|
||
"knowledge_facts": update.knowledge_facts,
|
||
"last_interaction_at": last_at,
|
||
"last_interaction_chat_id": chat_id,
|
||
},
|
||
)
|
||
|
||
# Memory write for the interjection beat — a second pair
|
||
# of memory_written events (host + guest POVs).
|
||
interject_memory_results = record_turn_memory_for_present(
|
||
conn,
|
||
chat_id=chat_id,
|
||
host_bot_id=host_bot["id"],
|
||
guest_bot_id=guest_bot_id,
|
||
narrative_text=interjection_text,
|
||
scene_id=scene["id"] if scene else None,
|
||
chat_clock_at=chat.get("time"),
|
||
)
|
||
|
||
# T74.2: enqueue a significance pass for the interjection
|
||
# memory. Mirrors the primary-turn enqueue pattern above —
|
||
# we score on the host's memory id since the prose is
|
||
# identical across both POVs (per-POV rewrite happens at
|
||
# scene close in T45). Without this enqueue the
|
||
# interjection beat lands in memory but never gets scored,
|
||
# so it can never auto-pin even when it carries a pivotal
|
||
# moment.
|
||
interject_host_event = interject_memory_results.get(
|
||
host_bot["id"]
|
||
)
|
||
interject_host_memory_id = (
|
||
interject_host_event[1] if interject_host_event else None
|
||
)
|
||
if (
|
||
worker is not None
|
||
and interject_host_memory_id is not None
|
||
):
|
||
worker.enqueue(
|
||
SignificanceJob(
|
||
memory_id=interject_host_memory_id,
|
||
narrative_text=interjection_text,
|
||
prior_dialogue=recent_post_interject,
|
||
host_bot_id=host_bot["id"],
|
||
)
|
||
)
|
||
|
||
# 8a. Event-lifecycle detection (Phase 3, T61). Runs after the post-turn
|
||
# classifier passes (memory write + state update + optional
|
||
# interjection) and BEFORE scene-close detection. The classifier reads
|
||
# ``primary_text`` against the chat's currently-active events and
|
||
# returns a (usually empty) list of transitions. Each transition lands
|
||
# an ``event_started`` / ``event_completed`` / ``event_cancelled``
|
||
# event via ``append_and_apply`` so the events projection updates
|
||
# synchronously. A completion is followed inline by
|
||
# ``promote_completed_event`` so any structured artifacts the event
|
||
# carries (knowledge_facts, relationship_change, acquired_objects)
|
||
# land in state in the same turn — see chat/services/event_promotion.
|
||
#
|
||
# ``detect_event_transitions`` short-circuits when ``active_events``
|
||
# is empty (per T52), so chats without active events don't pay a
|
||
# classifier round-trip and existing fixtures need no extra canned
|
||
# slots.
|
||
active_events = list_active_events(conn, chat_id)
|
||
if active_events:
|
||
lifecycle_decision = await detect_event_transitions(
|
||
client,
|
||
classifier_model=settings.classifier_model,
|
||
narrative_text=primary_text,
|
||
active_events=active_events,
|
||
timeout_s=settings.classifier_timeout_s,
|
||
)
|
||
for transition in lifecycle_decision.transitions:
|
||
if transition.new_status == "active":
|
||
append_and_apply(
|
||
conn,
|
||
kind="event_started",
|
||
payload={
|
||
"event_id": transition.event_id,
|
||
"started_at": chat.get("time"),
|
||
},
|
||
)
|
||
elif transition.new_status == "completed":
|
||
append_and_apply(
|
||
conn,
|
||
kind="event_completed",
|
||
payload={
|
||
"event_id": transition.event_id,
|
||
"completed_at": chat.get("time"),
|
||
},
|
||
)
|
||
# Run promotion inline so the artifact-emitting events
|
||
# (edge_update / manual_edit) land synchronously after
|
||
# the completion. ``promote_completed_event`` is
|
||
# synchronous (no await) and skips silently when the
|
||
# event row's status isn't 'completed' — a safety net
|
||
# for races, not expected to trigger in practice.
|
||
promote_completed_event(
|
||
conn,
|
||
event_id=transition.event_id,
|
||
chat_id=chat_id,
|
||
chat_clock_at=chat.get("time"),
|
||
)
|
||
elif transition.new_status == "cancelled":
|
||
append_and_apply(
|
||
conn,
|
||
kind="event_cancelled",
|
||
payload={
|
||
"event_id": transition.event_id,
|
||
"completed_at": chat.get("time"),
|
||
},
|
||
)
|
||
# Any other ``new_status`` value falls through silently —
|
||
# the lifecycle service constrains the schema to the three
|
||
# valid transitions, and a defensive no-op here keeps the
|
||
# turn flow tolerant of unexpected outputs.
|
||
|
||
# 9. Scene-close detection (Plan §7.2, T26). Runs AFTER assistant_turn
|
||
# and the optional interjection so the bots' responses are part of
|
||
# the closing scene's final beat — closing before narrative would
|
||
# force the bot to speak "in no scene", which is awkward. Hard
|
||
# signals only in Phase 1: container change parsed from prose, or
|
||
# explicit "fade out" / "we're done here" patterns. On classifier
|
||
# failure the service returns ``should_close=False`` so the turn
|
||
# flow keeps moving; the manual close button in the drawer is the
|
||
# always-available fallback.
|
||
#
|
||
# Skip empty prose — no signal to classify and no point spending a
|
||
# round-trip. Skip when there's no active scene (e.g. after a prior
|
||
# close in the same chat) — we have nothing to close. T13 (kickoff)
|
||
# is the only scene-opener path in v1; Phase 2-3 will handle
|
||
# automatic re-opening with the next container.
|
||
#
|
||
# T74.3: this branch deliberately runs even when ``cancelled`` is
|
||
# True. Close detection consumes only the user's prose (which is
|
||
# fully appended to the event_log BEFORE streaming starts) and the
|
||
# current container name; it does NOT consume the bot's output.
|
||
# A user who types "we're done here, fade out" and then hits Stop
|
||
# mid-stream still meant to close the scene — the cancelled bot
|
||
# beat doesn't invalidate that intent. Pinned by
|
||
# test_cancelled_turn_still_closes_scene_when_user_prose_signals_close.
|
||
if scene is not None and prose.strip():
|
||
container = None
|
||
if scene.get("container_id") is not None:
|
||
container = get_container(conn, scene["container_id"])
|
||
container_name = container["name"] if container else "unknown"
|
||
decision = await detect_scene_close(
|
||
client,
|
||
model=settings.classifier_model,
|
||
prose=prose,
|
||
current_container_name=container_name,
|
||
)
|
||
if decision.should_close:
|
||
append_and_apply(
|
||
conn,
|
||
kind="scene_closed",
|
||
payload={
|
||
"scene_id": scene["id"],
|
||
"ended_at": chat.get("time"),
|
||
# T27 promotes the per-POV summary into ``edges.summary``
|
||
# but doesn't currently set scene significance — the
|
||
# async significance pass (T22) operates on memories.
|
||
"significance": 0,
|
||
},
|
||
)
|
||
# T27 / T45: per-POV summary + edge summary update + knowledge
|
||
# promotion for each present witness (host always; guest when
|
||
# present). Runs synchronously after the close so the next
|
||
# turn (or a subsequent GET /chats/<id>) sees the rewritten
|
||
# memories and edge summaries. Tolerates classifier failure
|
||
# (returns the empty default and skips the writes).
|
||
await apply_scene_close_summary(
|
||
conn,
|
||
client,
|
||
classifier_model=settings.classifier_model,
|
||
chat_id=chat_id,
|
||
scene_id=scene["id"],
|
||
host_bot_id=host_bot["id"],
|
||
timeout_s=settings.classifier_timeout_s,
|
||
)
|
||
|
||
# 10. Broadcast a JSON completion event (for JS consumers) and an HTML
|
||
# fragment event (for HTMX SSE swap-into-timeline). One pair per
|
||
# written assistant_turn so the timeline ends up with both the
|
||
# primary and the interjection beat in the right order.
|
||
await publish(
|
||
chat_id,
|
||
{
|
||
"event": "assistant_turn_complete",
|
||
"speaker_id": addressee_bot["id"],
|
||
"text": primary_text,
|
||
"truncated": primary_truncated,
|
||
},
|
||
)
|
||
primary_html = _render_turn_html(
|
||
addressee_bot["name"], primary_text, role="bot"
|
||
)
|
||
await publish(
|
||
chat_id, {"event": "turn_html", "data": primary_html}
|
||
)
|
||
|
||
if interjection_text is not None and interjection_speaker_id is not None:
|
||
# The interjector's display name is whichever bot wasn't the
|
||
# addressee — pull it from the in-scope variable directly.
|
||
interject_speaker_name = (
|
||
host_bot["name"]
|
||
if interjection_speaker_id == host_bot["id"]
|
||
else (guest_bot["name"] if guest_bot is not None else "bot")
|
||
)
|
||
await publish(
|
||
chat_id,
|
||
{
|
||
"event": "assistant_turn_complete",
|
||
"speaker_id": interjection_speaker_id,
|
||
"text": interjection_text,
|
||
"truncated": interjection_truncated,
|
||
},
|
||
)
|
||
interject_html = _render_turn_html(
|
||
interject_speaker_name, interjection_text, role="bot"
|
||
)
|
||
await publish(
|
||
chat_id, {"event": "turn_html", "data": interject_html}
|
||
)
|
||
|
||
if cancelled:
|
||
# Re-raise after the partial-turn has been recorded.
|
||
raise asyncio.CancelledError
|
||
|
||
return Response(status_code=204)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Cancel route (Task 34).
|
||
#
|
||
# Fire-and-forget: the Stop button POSTs here, we mark the in-flight
|
||
# streaming Task as cancelled, and return 204 immediately. The cancel
|
||
# propagates into the streaming coroutine on its next await, the
|
||
# CancelledError handler in ``post_turn`` catches it, and the partial
|
||
# is committed with ``truncated=True``. No body is needed — the SSE
|
||
# channel is the conveyor of state. If no turn is in flight (or the
|
||
# task already completed), we 204 silently so the client can fire the
|
||
# Stop button without a precondition check.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@router.post("/chats/{chat_id}/turns/cancel")
|
||
async def cancel_turn(chat_id: str, request: Request):
|
||
task = _in_flight_tasks.get(chat_id)
|
||
if task is None or task.done():
|
||
return Response(status_code=204)
|
||
task.cancel()
|
||
return Response(status_code=204)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Rewind routes (Task 28).
|
||
#
|
||
# Two endpoints: a GET that renders the impact-preview modal, and a POST
|
||
# that actually executes the rewind. The execution path opens its own
|
||
# database connection because the route's ``conn`` is closed when the
|
||
# dependency-injection scope exits — passing it to ``execute_rewind``
|
||
# would dangle.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@router.get(
|
||
"/chats/{chat_id}/rewind/preview/{event_id}",
|
||
response_class=HTMLResponse,
|
||
)
|
||
async def rewind_preview(
|
||
chat_id: str,
|
||
event_id: int,
|
||
request: Request,
|
||
conn=Depends(get_conn),
|
||
):
|
||
"""Render the rewind impact-preview modal as a small HTML fragment.
|
||
|
||
The HTMX form inside the fragment posts to the execute endpoint
|
||
below. v1 keeps the markup minimal — Task 35 polishes the modal.
|
||
"""
|
||
chat = get_chat(conn, chat_id)
|
||
if chat is None:
|
||
raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}")
|
||
preview = compute_rewind_preview(conn, event_id)
|
||
items = "".join(
|
||
f"<li>{count} × {html.escape(kind)}</li>"
|
||
for kind, count in preview["by_kind"].items()
|
||
)
|
||
body = (
|
||
"<div class='rewind-modal'>"
|
||
f"<h3>Rewind to event {event_id}?</h3>"
|
||
f"<p>This will remove {preview['total_events']} events:</p>"
|
||
f"<ul>{items}</ul>"
|
||
f"<form hx-post='/chats/{html.escape(chat_id)}/rewind/{event_id}' "
|
||
"hx-target='body' hx-swap='innerHTML'>"
|
||
"<button type='submit'>Confirm Rewind</button>"
|
||
"</form>"
|
||
"</div>"
|
||
)
|
||
return HTMLResponse(body)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Regenerate route (Task 29).
|
||
#
|
||
# A POST that re-streams the most recent assistant turn. The prior
|
||
# ``assistant_turn`` event is kept in the log but flagged
|
||
# ``superseded_by`` so the timeline filter in :func:`_read_recent_dialogue`
|
||
# hides it. When the user supplies ``prose`` the original ``user_turn``
|
||
# is also superseded by a fresh ``user_turn_edit`` event capturing the
|
||
# edit. Significance is *not* re-run on regenerate (per plan §11.1) but
|
||
# state-update + memory writes are.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@router.post("/chats/{chat_id}/turns/{event_id}/regenerate")
|
||
async def regenerate_turn(
|
||
chat_id: str,
|
||
event_id: int,
|
||
request: Request,
|
||
prose: str | None = Form(None),
|
||
conn=Depends(get_conn),
|
||
client=Depends(get_llm_client),
|
||
):
|
||
"""Regenerate the assistant turn referenced by ``event_id``.
|
||
|
||
``prose`` is optional. When provided (and non-empty) we capture a
|
||
``user_turn_edit`` event before re-streaming. Returns 204 on
|
||
success, 404 when the chat or assistant_turn event is missing. The
|
||
SSE channel emits per-token events as the new text arrives.
|
||
"""
|
||
chat = get_chat(conn, chat_id)
|
||
if chat is None:
|
||
raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}")
|
||
settings = request.app.state.settings
|
||
# Local import keeps the module import graph flat (the service
|
||
# imports from ``state`` / ``services`` siblings already).
|
||
from chat.services.regenerate import regenerate_assistant_turn
|
||
|
||
edited_prose = prose if prose else None
|
||
try:
|
||
await regenerate_assistant_turn(
|
||
conn,
|
||
client,
|
||
settings=settings,
|
||
chat_id=chat_id,
|
||
original_assistant_event_id=event_id,
|
||
edited_user_prose=edited_prose,
|
||
)
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=404, detail=str(e))
|
||
return Response(status_code=204)
|
||
|
||
|
||
@router.post("/chats/{chat_id}/rewind/{event_id}")
|
||
async def rewind_execute(
|
||
chat_id: str,
|
||
event_id: int,
|
||
request: Request,
|
||
conn=Depends(get_conn),
|
||
):
|
||
"""Execute the rewind: snapshot, truncate event_log, re-project.
|
||
|
||
Note: ``conn`` is only used to validate the chat exists. The actual
|
||
rewind opens its own connection inside ``execute_rewind`` because
|
||
we need it to commit independently and survive the route's
|
||
dependency teardown.
|
||
"""
|
||
chat = get_chat(conn, chat_id)
|
||
if chat is None:
|
||
raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}")
|
||
settings = request.app.state.settings
|
||
execute_rewind(
|
||
db_path=settings.db_path,
|
||
data_dir=settings.data_dir,
|
||
after_event_id=event_id,
|
||
)
|
||
return RedirectResponse(url=f"/chats/{chat_id}", status_code=303)
|