"""POST ``/chats//turns`` — narrative turn flow with SSE streaming. The turn flow strings together the pieces built in T17 (turn parser), T18 (prompt assembler), and T16 (SSE channel). Phase 2 (T44) extends it to multi-entity scenes with optional guest support and a follow-on interjection beat. 1. Parse the user's prose with the classifier into typed segments. 2. Append a ``user_turn`` event capturing both the original prose and the parsed segments. 3. Append a placeholder ``assistant_turn_started`` marker so observers know a response is in flight. 4. Detect the addressee (host vs. guest) from the prose using a simple word-boundary substring match — see :func:`_detect_addressee_id`. 5. Build the narrative prompt for the addressee, dropping OOC segments before they reach the bot (per Requirements §6.1 the OOC convention is for the author to talk to the system, not to the in-fiction bot). 6. Stream tokens from the LLM, broadcasting each chunk over the chat's SSE channel as a ``token`` event so any subscribed browser tab sees them arrive in real time. 7. On stream complete, append an ``assistant_turn`` event with the full text and ``truncated=False``. Then run a post-turn state-update pass (Requirements §3.4): one classifier call per directed edge between present entities, each producing an ``edge_update`` event with affinity/trust/knowledge deltas. 8. When a guest is present, run the interjection classifier (§6.2). If it fires we stream a second narrative as the silent witness, append a second ``assistant_turn`` event linked to the same ``user_turn_id``, and re-run memory + state-update for the interjector. The same in-flight task covers both halves so cancel collapses both. 9. Scene-close detection runs after the (primary + optional interjection) beats land so the close summary sees the full closing scene. T45's guest-aware ``apply_scene_close_summary`` writes per-POV summaries for each present witness. 10. Publish a ``turn_html`` event for each turn so HTMX's SSE extension can append it to the timeline without a page reload. 11. Return ``204 No Content`` — the SSE channel is the real conveyor of state, not the POST response body. Errors during streaming flip the assistant_turn's ``truncated`` flag to ``True`` and we still commit what we received. ``asyncio.CancelledError`` is treated identically and re-raised after recording the partial turn. A cancellation mid-interjection skips the interjector's state/memory follow-up so we don't run classifiers against a half-formed beat. """ from __future__ import annotations import asyncio import html import json import re from datetime import timedelta from fastapi import APIRouter, Depends, Form, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse, Response from chat.eventlog.log import append_and_apply, append_event from chat.services.addressee import detect_addressee from chat.services.background import SignificanceJob from chat.services.event_lifecycle import detect_event_transitions from chat.services.event_promotion import promote_completed_event from chat.services.interjection import detect_interjection from chat.services.memory_write import record_turn_memory_for_present from chat.services.multi_state_update import compute_state_updates_for_present from chat.services.prompt import ( assemble_narrative_prompt, consume_pending_meanwhile_digests, ) from chat.services.rewind import compute_rewind_preview, execute_rewind from chat.services.scene_close import detect_scene_close from chat.services.scene_summarize import apply_scene_close_summary from chat.services.turn_common import ( gather_prior_edges, read_recent_dialogue, ) from chat.services.turn_parse import ParsedTurn, parse_turn from chat.state.edges import get_edge from chat.state.entities import get_bot, get_you from chat.state.events import list_active_events from chat.state.meanwhile import list_meanwhile_scenes from chat.state.world import active_scene, get_chat, get_container from chat.web.bots import get_conn from chat.web.kickoff import get_llm_client from chat.web.meanwhile import process_meanwhile_turn from chat.web.pubsub import publish from chat.web.render import render_turn_html as _render_turn_html from chat.web.skip import ( ChatNotFoundError, _parse_iso_time, process_elision_skip, ) router = APIRouter() # Module-level registry of in-flight streaming tasks, keyed by chat_id. # The POST /chats//turns/cancel route looks up the task and calls # .cancel(); the streaming coroutine in post_turn catches the resulting # CancelledError, commits the partial as truncated, and unregisters. # Single-process v1 only — sufficient for one user with multiple tabs. _in_flight_tasks: dict[str, asyncio.Task] = {} def _strip_ooc_for_prompt(parsed: ParsedTurn) -> str: """Concatenate non-OOC segments back to a prose string for the prompt. OOC segments (``((double parens))``) are kept in the user_turn payload for transcript display but stripped before assembly so the bot never sees author-to-system messages. """ keep = [s.text for s in parsed.segments if s.kind != "ooc"] return " ".join(keep).strip() def _read_recent_dialogue(conn, chat_id: str, limit: int = 200) -> list[dict]: """Return user-side and assistant_turn events for ``chat_id``. T83.2: thin delegate over :func:`chat.services.turn_common.read_recent_dialogue` so post_turn and regenerate share one implementation. The wrapper survives so the chat-detail template and other callers in this module don't all have to update at once. """ return read_recent_dialogue(conn, chat_id, limit=limit) def _detect_addressee_id( prose: str, host_bot: dict, guest_bot: dict | None ) -> str: """Return the bot id of the addressee for ``prose``. Phase 2 v1 uses a simple case-insensitive whole-word match. The host is the default — addressee flips to guest only when the guest's name appears in the prose AND the host's does not. If both names match or neither matches, the host keeps the floor. This bias keeps the primary speaker stable across ambiguous prose; the interjection branch (later in the turn flow) is how the silent witness gets a word in edgewise when warranted. """ if guest_bot is None: return host_bot["id"] host_name = host_bot.get("name") or "" guest_name = guest_bot.get("name") or "" host_match = bool( host_name and re.search(rf"\b{re.escape(host_name)}\b", prose, re.IGNORECASE) ) guest_match = bool( guest_name and re.search(rf"\b{re.escape(guest_name)}\b", prose, re.IGNORECASE) ) if guest_match and not host_match: return guest_bot["id"] return host_bot["id"] def _gather_state_update_inputs( conn, *, host_bot: dict, guest_bot: dict | None, you_entity: dict, ) -> tuple[list[str], dict[str, str], dict[str, str], dict[tuple[str, str], dict]]: """Collect ``(present_ids, present_names, personas, prior_edges)`` for a multi-entity state-update pass. Phase 2 v1 always pairs ``you`` with the host and (when present) the guest. ``prior_edges`` falls back to the schema default 50/50 baseline when no row exists yet — that mirrors the Phase 1 single-pair flow. Order matters: the host comes first so the directed-pair iteration in :func:`compute_state_updates_for_present` matches the Phase 1 sequence (host->you, then you->host). Existing tests pin the canned- response queue to that order — keeping it stable means we don't have to reshuffle test fixtures across the Phase 2 cutover. """ present_ids: list[str] = [host_bot["id"], "you"] present_names: dict[str, str] = { host_bot["id"]: host_bot["name"], "you": you_entity.get("name") or "you", } personas: dict[str, str] = { host_bot["id"]: host_bot.get("persona") or "", "you": you_entity.get("persona") or "", } if guest_bot is not None: present_ids.append(guest_bot["id"]) present_names[guest_bot["id"]] = guest_bot["name"] personas[guest_bot["id"]] = guest_bot.get("persona") or "" # T83.2: directed-edge gather is shared with regenerate.py. prior_edges = gather_prior_edges(conn, present_ids) return present_ids, present_names, personas, prior_edges @router.post("/chats/{chat_id}/turns") async def post_turn( chat_id: str, request: Request, prose: str = Form(""), conn=Depends(get_conn), client=Depends(get_llm_client), ): if not prose.strip(): raise HTTPException(status_code=400, detail="prose cannot be empty") chat = get_chat(conn, chat_id) if chat is None: raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}") host_bot = get_bot(conn, chat["host_bot_id"]) if host_bot is None: # Defensive: chat row references a missing bot. raise HTTPException( status_code=404, detail=f"host bot not found: {chat['host_bot_id']}", ) guest_bot = None guest_bot_id = chat.get("guest_bot_id") if guest_bot_id is not None: # T47's bot_reset cascade clears guest_bot_id from any chat that # referenced the deleted bot, so by the time we read it here it's # either None or a live bot id. The previous defensive # degrade-to-1:1 block (T44) was rendered dead by T47 and removed # in T74.4 — get_bot now returns a real row. guest_bot = get_bot(conn, guest_bot_id) settings = request.app.state.settings # 0. Meanwhile-mode short-circuit (T64). When an active meanwhile # scene is running on this chat, the turn flow is entirely between # the two bots — "you" is absent. The meanwhile controller mirrors # the post_turn shape but with no-you semantics: present_set_kind # ``host_guest`` in the prompt assembler, ``record_meanwhile_memory`` # for witness flags, only 2 directed pairs in the state update, and # the assistant_turn payload tagged with ``meanwhile_scene_id`` so # alternation lookups can scope to this scene specifically. The # T62 skip-intent dispatch and the regular narrative path below # are skipped — a meanwhile beat is its own self-contained flow. if list_meanwhile_scenes(conn, chat_id, status="active"): try: await process_meanwhile_turn( conn, client, settings, chat_id=chat_id, prose=prose, app=request.app, ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) return Response(status_code=204) # 1. Parse turn (classifier). parsed = await parse_turn( client, model=settings.classifier_model, prose=prose ) prompt_prose = _strip_ooc_for_prompt(parsed) # 1a. Skip-command short-circuit (T62). The parser may classify the # prose as a time-skip directive — in which case the regular # narrative path (addressee detection, narrative stream, post-turn # state-update + scene-close passes) is skipped entirely. Elision # runs through the shared controller in :mod:`chat.web.skip`; jump # is drawer-only for Phase 3 (the natural-language path returns # 422 directing the user to the drawer's jump form, where they can # supply structured ``notable_prose`` and a target time). Anything # not matching these intents falls through to the narrative branch. intent = getattr(parsed, "intent", "narrative") or "narrative" if intent == "skip_jump": # Drawer-only jump for Phase 3: parsing a free-form fiction-time # delta out of natural language ("next morning" -> ?) is fragile # enough that we'd rather route the user to the drawer form, # where they pick a concrete ISO time and an optional notable- # prose field. 422 = "request shape is understood, but the # required structured input lives on a different surface". return JSONResponse( { "error": ( "Jump skip requires the drawer's jump form for " "notable_prose." ) }, status_code=422, ) if intent == "skip_elision": # T82.2: run scene-close detection on the user's prose BEFORE # the skip controller fires. Prose like "fade out, skip an hour" # carries both a close signal and a skip directive; we want the # close summary to capture the closing scene's final beat (and # promote per-POV memories) before the time advances. Order # matters: scene close -> skip narration -> time advance. # # When there's no active scene (or the prose carries no close # signal) ``detect_scene_close`` returns the safe # ``should_close=False`` default and we drop straight to the # skip controller — same behavior as today, no extra cost. skip_scene = active_scene(conn, chat_id) if skip_scene is not None: container = None if skip_scene.get("container_id") is not None: container = get_container(conn, skip_scene["container_id"]) container_name = container["name"] if container else "unknown" close_decision = await detect_scene_close( client, model=settings.classifier_model, prose=prose, current_container_name=container_name, ) if close_decision.should_close: append_and_apply( conn, kind="scene_closed", payload={ "scene_id": skip_scene["id"], "ended_at": chat.get("time"), "significance": 0, }, ) await apply_scene_close_summary( conn, client, classifier_model=settings.classifier_model, chat_id=chat_id, scene_id=skip_scene["id"], host_bot_id=host_bot["id"], timeout_s=settings.classifier_timeout_s, ) # Derive ``new_time`` from the chat clock. Phase 3 stub: bump by # 1 hour. The drawer's elision form is the structured path when # the author wants a specific landing time; here the goal is # "elide the dull bit" and any sensible forward step is fine — # ``narrate_skip`` weaves the landing-state hint into the # transition prose so the prose carries the semantic time, not # the timestamp itself. cur_dt = _parse_iso_time(chat.get("time") or "") new_time = ( (cur_dt + timedelta(hours=1)).isoformat() if cur_dt is not None else (chat.get("time") or "") ) try: await process_elision_skip( conn, client, settings, chat_id=chat_id, new_time=new_time, landing_state_hint=getattr(parsed, "landing_state_hint", "") or "", app=request.app, ) except ChatNotFoundError as exc: # Defensive: chat existence is checked above, so this only # fires on a TOCTOU race where the chat row is deleted # mid-request. T81 split the typed missing-chat case out of # the generic ValueError so we keep the 404 mapping here. raise HTTPException(status_code=404, detail=str(exc)) except ValueError as exc: # Bad new_time is a stub-derivation bug rather than user # input — surface as 400 with the controller message. raise HTTPException(status_code=400, detail=str(exc)) return Response(status_code=204) # 2. Append user_turn event. user_turn_event_id = append_event( conn, kind="user_turn", payload={ "chat_id": chat_id, "prose": prose, "segments": [s.model_dump() for s in parsed.segments], }, ) # 3. Determine the addressee. Done before assistant_turn_started so the # placeholder reflects the bot the user is actually talking to (host # in 1:1, host-or-guest in multi-entity). T74.1 routes the multi-entity # case through the addressee classifier; the no-guest case still uses # the substring fast-path because there is nothing to classify when # only one bot is present (and a classifier round-trip there would # just be throughput overhead). if guest_bot is None: addressee_id = _detect_addressee_id(prose, host_bot, guest_bot) else: decision = await detect_addressee( client, classifier_model=settings.classifier_model, user_prose=prose, host_id=host_bot["id"], host_name=host_bot["name"], guest_id=guest_bot["id"], guest_name=guest_bot["name"], timeout_s=settings.classifier_timeout_s, ) addressee_id = decision.addressee_id addressee_bot = ( guest_bot if (guest_bot is not None and addressee_id == guest_bot["id"]) else host_bot ) # 4. Append assistant_turn_started placeholder. ``user_turn``, # ``assistant_turn_started``, and ``assistant_turn`` have no registered # projector handlers — they live in the event_log purely for transcript # rendering — so we don't call ``project`` here. (Re-projecting now would # also re-run prior non-idempotent inserts like ``chat_created``.) append_event( conn, kind="assistant_turn_started", payload={ "chat_id": chat_id, "speaker_id": addressee_bot["id"], "user_turn_id": user_turn_event_id, }, ) # 5. Build the narrative prompt for the addressee. ``guest_id`` is # passed explicitly so the prompt assembler renders the guest's # activity / group-node block when applicable. The assembler is # tolerant of ``guest_id is None`` so this is a no-op for 1:1 chats. recent = _read_recent_dialogue(conn, chat_id, limit=20) # Drop the just-appended user turn from ``recent`` — it's passed as # ``user_turn_prose`` to the assembler and would otherwise duplicate. if recent and recent[-1].get("speaker") == "you": recent = recent[:-1] messages = assemble_narrative_prompt( conn, chat_id=chat_id, speaker_bot_id=addressee_bot["id"], user_turn_prose=prompt_prose if prompt_prose else None, recent_dialogue=recent, budget_soft=settings.narrative_budget_soft, budget_hard=settings.narrative_budget_hard, guest_id=guest_bot_id, ) # 6. Stream and accumulate tokens. The stream runs as a Task so the # /turns/cancel route can invoke ``Task.cancel()`` to abort it # mid-stream. ``accumulated`` is a closure over the inner coroutine, # so when the await on ``stream_task`` raises CancelledError below # we still see whatever tokens were appended before cancellation. primary_accumulated: list[str] = [] primary_truncated = False cancelled = False async def _stream_primary() -> None: async for chunk in client.stream( messages, model=settings.narrative_model, max_tokens=settings.narrative_max_tokens, temperature=settings.narrative_temperature, ): primary_accumulated.append(chunk) await publish( chat_id, { "event": "token", "text": chunk, "speaker_id": addressee_bot["id"], }, ) stream_task = asyncio.create_task(_stream_primary()) _in_flight_tasks[chat_id] = stream_task try: await stream_task except asyncio.CancelledError: # Preserve the partial output before letting the cancellation # propagate so the transcript reflects what the user actually saw. primary_truncated = True cancelled = True except Exception: # Surface as a truncated turn rather than losing the partial output. primary_truncated = True finally: # Always unregister so a subsequent turn can register a fresh task. _in_flight_tasks.pop(chat_id, None) primary_text = "".join(primary_accumulated) # 7. Append the assistant_turn with the final text. (See note above on # why we skip ``project`` for these transcript-only event kinds.) # Capture the returned event id so we can stamp ``id="turn-"`` on # the SSE-emitted HTML fragment — the chat-page ``turn_html_replace`` # handler relies on the id to swap regenerated turns in-place # (T86 follow-up). primary_assistant_event_id = append_event( conn, kind="assistant_turn", payload={ "chat_id": chat_id, "speaker_id": addressee_bot["id"], "text": primary_text, "truncated": primary_truncated, "user_turn_id": user_turn_event_id, }, ) # 7a. Per-turn memory write (Plan §11.1, T21 / T41). With a guest # present this fans out to one ``memory_written`` event per witness # (host + guest); without a guest it preserves the Phase 1 single # write keyed on the host. Witness flags are set inside the helper. scene = active_scene(conn, chat_id) memory_results = record_turn_memory_for_present( conn, chat_id=chat_id, host_bot_id=host_bot["id"], guest_bot_id=guest_bot_id, narrative_text=primary_text, scene_id=scene["id"] if scene else None, chat_clock_at=chat.get("time"), app=request.app, ) # 7b. Post-turn state-update pass (Requirements §3.4 / T40). All # directed pairs over the present entities — 2 pairs for 1:1, 6 for # 3-entity scenes. Run sequentially via the inner helper which honors # the Featherless 2-conn cap. you_entity = get_you(conn) or {"name": "you", "persona": ""} last_at = chat.get("time") recent_for_update = _read_recent_dialogue(conn, chat_id, limit=10) present_ids, present_names, personas, prior_edges = ( _gather_state_update_inputs( conn, host_bot=host_bot, guest_bot=guest_bot, you_entity=you_entity, ) ) state_updates = await compute_state_updates_for_present( client, classifier_model=settings.classifier_model, present_ids=present_ids, present_names=present_names, personas=personas, prior_edges=prior_edges, recent_dialogue=recent_for_update, timeout_s=settings.classifier_timeout_s, ) for src_id, tgt_id, update in state_updates: append_and_apply( conn, kind="edge_update", payload={ "source_id": src_id, "target_id": tgt_id, "chat_id": chat_id, "affinity_delta": update.affinity_delta, "trust_delta": update.trust_delta, "knowledge_facts": update.knowledge_facts, "last_interaction_at": last_at, "last_interaction_chat_id": chat_id, }, ) # 7c. Enqueue the async significance pass (Plan §11.1, T22). The # worker scores the just-written memory 0-3, updates significance, # and auto-pins on score 3 with the §8.5 soft-cap eviction rule. # Phase 2 picks the host's memory id as the canonical input — guest # POV memories piggyback on the same significance score (the prose # they record is identical for v2; per-POV rewrite happens at scene # close in T45 and downstream-of-significance). worker = getattr(request.app.state, "background_worker", None) host_event_memory = memory_results.get(host_bot["id"]) host_memory_id = host_event_memory[1] if host_event_memory else None if worker is not None and host_memory_id is not None: worker.enqueue( SignificanceJob( memory_id=host_memory_id, narrative_text=primary_text, prior_dialogue=recent_for_update, host_bot_id=host_bot["id"], ) ) # 8. Interjection branch (T39 / T44). Only fires when the chat has a # guest AND the addressee was the bot we *can* interject for (i.e. # not the lone bot in a 1:1 chat). The silent witness is whichever # bot didn't get the addressee slot. We only run this when the # primary stream actually completed — a cancelled or errored primary # short-circuits the follow-on so we don't classifier-spam against a # half-formed beat. interjection_text: str | None = None interjection_speaker_id: str | None = None interjection_truncated = False interjection_event_id: int | None = None if ( guest_bot is not None and not cancelled and not primary_truncated and primary_text.strip() ): # Identify the silent witness — the bot that is NOT the addressee. if addressee_id == host_bot["id"]: silent_witness = guest_bot else: silent_witness = host_bot edge_w_to_addr = get_edge( conn, silent_witness["id"], addressee_bot["id"] ) or {"affinity": 50, "trust": 50, "summary": ""} edge_w_to_you = get_edge(conn, silent_witness["id"], "you") or { "affinity": 50, "trust": 50, "summary": "", } decision = await detect_interjection( client, classifier_model=settings.classifier_model, addressee_name=addressee_bot["name"], addressee_just_said=primary_text, silent_witness_name=silent_witness["name"], silent_witness_persona=silent_witness.get("persona") or "", silent_witness_edge_to_addressee=edge_w_to_addr, silent_witness_edge_to_you=edge_w_to_you, you_just_said=prose, timeout_s=settings.classifier_timeout_s, ) if decision.should_interject: interjection_speaker_id = silent_witness["id"] # Re-read recent_dialogue so the just-appended assistant_turn # (the addressee's beat) is in the prompt context. interject_recent = _read_recent_dialogue(conn, chat_id, limit=20) if interject_recent and interject_recent[-1].get("speaker") == "you": interject_recent = interject_recent[:-1] interject_messages = assemble_narrative_prompt( conn, chat_id=chat_id, speaker_bot_id=silent_witness["id"], addressee=addressee_bot["id"], user_turn_prose=prompt_prose if prompt_prose else None, recent_dialogue=interject_recent, budget_soft=settings.narrative_budget_soft, budget_hard=settings.narrative_budget_hard, guest_id=guest_bot_id, ) interject_accumulated: list[str] = [] async def _stream_interjection() -> None: async for chunk in client.stream( interject_messages, model=settings.narrative_model, max_tokens=settings.narrative_max_tokens, temperature=settings.narrative_temperature, ): interject_accumulated.append(chunk) await publish( chat_id, { "event": "token", "text": chunk, "speaker_id": silent_witness["id"], }, ) interject_task = asyncio.create_task(_stream_interjection()) _in_flight_tasks[chat_id] = interject_task try: await interject_task except asyncio.CancelledError: interjection_truncated = True cancelled = True except Exception: interjection_truncated = True finally: _in_flight_tasks.pop(chat_id, None) interjection_text = "".join(interject_accumulated) # Capture the event id (T86 follow-up) so the SSE fragment # below carries ``id="turn-"`` for in-place swap. interjection_event_id = append_event( conn, kind="assistant_turn", payload={ "chat_id": chat_id, "speaker_id": silent_witness["id"], "text": interjection_text, "truncated": interjection_truncated, "user_turn_id": user_turn_event_id, "interjection_of": addressee_bot["id"], }, ) # Skip the downstream classifier passes if the interjection # was cancelled mid-stream — we don't want to score a partial # beat the user never got to read in full. if not interjection_truncated: # Re-run the multi-pair state update — the interjector # adding their voice plausibly shifts edges for everyone # in the room. Idempotent enough for v2 (deltas accumulate; # no stale state). Re-read recent so the just-appended # interjection turn is in scope. recent_post_interject = _read_recent_dialogue( conn, chat_id, limit=10 ) # Re-fetch prior edges so deltas land on the post-primary # state rather than the pre-turn baseline. _, _, _, prior_edges_post = _gather_state_update_inputs( conn, host_bot=host_bot, guest_bot=guest_bot, you_entity=you_entity, ) state_updates_post = await compute_state_updates_for_present( client, classifier_model=settings.classifier_model, present_ids=present_ids, present_names=present_names, personas=personas, prior_edges=prior_edges_post, recent_dialogue=recent_post_interject, timeout_s=settings.classifier_timeout_s, ) for src_id, tgt_id, update in state_updates_post: append_and_apply( conn, kind="edge_update", payload={ "source_id": src_id, "target_id": tgt_id, "chat_id": chat_id, "affinity_delta": update.affinity_delta, "trust_delta": update.trust_delta, "knowledge_facts": update.knowledge_facts, "last_interaction_at": last_at, "last_interaction_chat_id": chat_id, }, ) # Memory write for the interjection beat — a second pair # of memory_written events (host + guest POVs). interject_memory_results = record_turn_memory_for_present( conn, chat_id=chat_id, host_bot_id=host_bot["id"], guest_bot_id=guest_bot_id, narrative_text=interjection_text, scene_id=scene["id"] if scene else None, chat_clock_at=chat.get("time"), app=request.app, ) # T74.2: enqueue a significance pass for the interjection # memory. Mirrors the primary-turn enqueue pattern above — # we score on the host's memory id since the prose is # identical across both POVs (per-POV rewrite happens at # scene close in T45). Without this enqueue the # interjection beat lands in memory but never gets scored, # so it can never auto-pin even when it carries a pivotal # moment. interject_host_event = interject_memory_results.get( host_bot["id"] ) interject_host_memory_id = ( interject_host_event[1] if interject_host_event else None ) if ( worker is not None and interject_host_memory_id is not None ): worker.enqueue( SignificanceJob( memory_id=interject_host_memory_id, narrative_text=interjection_text, prior_dialogue=recent_post_interject, host_bot_id=host_bot["id"], ) ) # 8a. Event-lifecycle detection (Phase 3, T61). Runs after the post-turn # classifier passes (memory write + state update + optional # interjection) and BEFORE scene-close detection. The classifier reads # ``primary_text`` against the chat's currently-active events and # returns a (usually empty) list of transitions. Each transition lands # an ``event_started`` / ``event_completed`` / ``event_cancelled`` # event via ``append_and_apply`` so the events projection updates # synchronously. A completion is followed inline by # ``promote_completed_event`` so any structured artifacts the event # carries (knowledge_facts, relationship_change, acquired_objects) # land in state in the same turn — see chat/services/event_promotion. # # ``detect_event_transitions`` short-circuits when ``active_events`` # is empty (per T52), so chats without active events don't pay a # classifier round-trip and existing fixtures need no extra canned # slots. active_events = list_active_events(conn, chat_id) if active_events: lifecycle_decision = await detect_event_transitions( client, classifier_model=settings.classifier_model, narrative_text=primary_text, active_events=active_events, timeout_s=settings.classifier_timeout_s, ) for transition in lifecycle_decision.transitions: if transition.new_status == "active": append_and_apply( conn, kind="event_started", payload={ "event_id": transition.event_id, "started_at": chat.get("time"), }, ) elif transition.new_status == "completed": append_and_apply( conn, kind="event_completed", payload={ "event_id": transition.event_id, "completed_at": chat.get("time"), }, ) # Run promotion inline so the artifact-emitting events # (edge_update / manual_edit) land synchronously after # the completion. ``promote_completed_event`` is # synchronous (no await) and skips silently when the # event row's status isn't 'completed' — a safety net # for races, not expected to trigger in practice. promote_completed_event( conn, event_id=transition.event_id, chat_id=chat_id, chat_clock_at=chat.get("time"), ) elif transition.new_status == "cancelled": append_and_apply( conn, kind="event_cancelled", payload={ "event_id": transition.event_id, "completed_at": chat.get("time"), }, ) # Any other ``new_status`` value falls through silently — # the lifecycle service constrains the schema to the three # valid transitions, and a defensive no-op here keeps the # turn flow tolerant of unexpected outputs. # 9. Scene-close detection (Plan §7.2, T26). Runs AFTER assistant_turn # and the optional interjection so the bots' responses are part of # the closing scene's final beat — closing before narrative would # force the bot to speak "in no scene", which is awkward. Hard # signals only in Phase 1: container change parsed from prose, or # explicit "fade out" / "we're done here" patterns. On classifier # failure the service returns ``should_close=False`` so the turn # flow keeps moving; the manual close button in the drawer is the # always-available fallback. # # Skip empty prose — no signal to classify and no point spending a # round-trip. Skip when there's no active scene (e.g. after a prior # close in the same chat) — we have nothing to close. T13 (kickoff) # is the only scene-opener path in v1; Phase 2-3 will handle # automatic re-opening with the next container. # # T74.3: this branch deliberately runs even when ``cancelled`` is # True. Close detection consumes only the user's prose (which is # fully appended to the event_log BEFORE streaming starts) and the # current container name; it does NOT consume the bot's output. # A user who types "we're done here, fade out" and then hits Stop # mid-stream still meant to close the scene — the cancelled bot # beat doesn't invalidate that intent. Pinned by # test_cancelled_turn_still_closes_scene_when_user_prose_signals_close. if scene is not None and prose.strip(): container = None if scene.get("container_id") is not None: container = get_container(conn, scene["container_id"]) container_name = container["name"] if container else "unknown" decision = await detect_scene_close( client, model=settings.classifier_model, prose=prose, current_container_name=container_name, ) if decision.should_close: append_and_apply( conn, kind="scene_closed", payload={ "scene_id": scene["id"], "ended_at": chat.get("time"), # T27 promotes the per-POV summary into ``edges.summary`` # but doesn't currently set scene significance — the # async significance pass (T22) operates on memories. "significance": 0, }, ) # T27 / T45: per-POV summary + edge summary update + knowledge # promotion for each present witness (host always; guest when # present). Runs synchronously after the close so the next # turn (or a subsequent GET /chats/) sees the rewritten # memories and edge summaries. Tolerates classifier failure # (returns the empty default and skips the writes). await apply_scene_close_summary( conn, client, classifier_model=settings.classifier_model, chat_id=chat_id, scene_id=scene["id"], host_bot_id=host_bot["id"], timeout_s=settings.classifier_timeout_s, ) # 9a. Consume any pending meanwhile digests now that the assistant_turn # (which surfaced them in its prompt via T65's helper) has landed. The # spec's "first you-turn AFTER meanwhile close consumes the digest" # semantics are preserved by running this AFTER scene-close detection # — anything pending right now belongs to the prompt we just answered, # so it's safe to mark consumed and the NEXT turn starts clean. # Idempotent: re-calling produces zero events when nothing's pending. consume_pending_meanwhile_digests(conn, chat_id) # 10. Broadcast a JSON completion event (for JS consumers) and an HTML # fragment event (for HTMX SSE swap-into-timeline). One pair per # written assistant_turn so the timeline ends up with both the # primary and the interjection beat in the right order. await publish( chat_id, { "event": "assistant_turn_complete", "speaker_id": addressee_bot["id"], "text": primary_text, "truncated": primary_truncated, }, ) primary_html = _render_turn_html( addressee_bot["name"], primary_text, role="bot", event_id=primary_assistant_event_id, ) await publish( chat_id, {"event": "turn_html", "data": primary_html} ) if interjection_text is not None and interjection_speaker_id is not None: # The interjector's display name is whichever bot wasn't the # addressee — pull it from the in-scope variable directly. interject_speaker_name = ( host_bot["name"] if interjection_speaker_id == host_bot["id"] else (guest_bot["name"] if guest_bot is not None else "bot") ) await publish( chat_id, { "event": "assistant_turn_complete", "speaker_id": interjection_speaker_id, "text": interjection_text, "truncated": interjection_truncated, }, ) interject_html = _render_turn_html( interject_speaker_name, interjection_text, role="bot", event_id=interjection_event_id, ) await publish( chat_id, {"event": "turn_html", "data": interject_html} ) if cancelled: # Re-raise after the partial-turn has been recorded. raise asyncio.CancelledError return Response(status_code=204) # --------------------------------------------------------------------------- # Cancel route (Task 34). # # Fire-and-forget: the Stop button POSTs here, we mark the in-flight # streaming Task as cancelled, and return 204 immediately. The cancel # propagates into the streaming coroutine on its next await, the # CancelledError handler in ``post_turn`` catches it, and the partial # is committed with ``truncated=True``. No body is needed — the SSE # channel is the conveyor of state. If no turn is in flight (or the # task already completed), we 204 silently so the client can fire the # Stop button without a precondition check. # --------------------------------------------------------------------------- @router.post("/chats/{chat_id}/turns/cancel") async def cancel_turn(chat_id: str, request: Request): task = _in_flight_tasks.get(chat_id) if task is None or task.done(): return Response(status_code=204) task.cancel() return Response(status_code=204) # --------------------------------------------------------------------------- # Rewind routes (Task 28). # # Two endpoints: a GET that renders the impact-preview modal, and a POST # that actually executes the rewind. The execution path opens its own # database connection because the route's ``conn`` is closed when the # dependency-injection scope exits — passing it to ``execute_rewind`` # would dangle. # --------------------------------------------------------------------------- @router.get( "/chats/{chat_id}/rewind/preview/{event_id}", response_class=HTMLResponse, ) async def rewind_preview( chat_id: str, event_id: int, request: Request, conn=Depends(get_conn), ): """Render the rewind impact-preview modal as a small HTML fragment. The HTMX form inside the fragment posts to the execute endpoint below. v1 keeps the markup minimal — Task 35 polishes the modal. """ chat = get_chat(conn, chat_id) if chat is None: raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}") preview = compute_rewind_preview(conn, event_id) items = "".join( f"
  • {count} × {html.escape(kind)}
  • " for kind, count in preview["by_kind"].items() ) body = ( "
    " f"

    Rewind to event {event_id}?

    " f"

    This will remove {preview['total_events']} events:

    " f"
      {items}
    " f"
    " "" "
    " "
    " ) return HTMLResponse(body) # --------------------------------------------------------------------------- # Regenerate route (Task 29). # # A POST that re-streams the most recent assistant turn. The prior # ``assistant_turn`` event is kept in the log but flagged # ``superseded_by`` so the timeline filter in :func:`_read_recent_dialogue` # hides it. When the user supplies ``prose`` the original ``user_turn`` # is also superseded by a fresh ``user_turn_edit`` event capturing the # edit. Significance is *not* re-run on regenerate (per plan §11.1) but # state-update + memory writes are. # --------------------------------------------------------------------------- @router.post("/chats/{chat_id}/turns/{event_id}/regenerate") async def regenerate_turn( chat_id: str, event_id: int, request: Request, prose: str | None = Form(None), conn=Depends(get_conn), client=Depends(get_llm_client), ): """Regenerate the assistant turn referenced by ``event_id``. ``prose`` is optional. When provided (and non-empty) we capture a ``user_turn_edit`` event before re-streaming. Returns 204 on success, 404 when the chat or assistant_turn event is missing. The SSE channel emits per-token events as the new text arrives. """ chat = get_chat(conn, chat_id) if chat is None: raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}") settings = request.app.state.settings # Local import keeps the module import graph flat (the service # imports from ``state`` / ``services`` siblings already). from chat.services.regenerate import regenerate_assistant_turn edited_prose = prose if prose else None try: await regenerate_assistant_turn( conn, client, settings=settings, chat_id=chat_id, original_assistant_event_id=event_id, edited_user_prose=edited_prose, app=request.app, ) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) return Response(status_code=204) @router.post("/chats/{chat_id}/rewind/{event_id}") async def rewind_execute( chat_id: str, event_id: int, request: Request, conn=Depends(get_conn), ): """Execute the rewind: snapshot, truncate event_log, re-project. Note: ``conn`` is only used to validate the chat exists. The actual rewind opens its own connection inside ``execute_rewind`` because we need it to commit independently and survive the route's dependency teardown. """ chat = get_chat(conn, chat_id) if chat is None: raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}") settings = request.app.state.settings execute_rewind( db_path=settings.db_path, data_dir=settings.data_dir, after_event_id=event_id, ) return RedirectResponse(url=f"/chats/{chat_id}", status_code=303)