feat: narrative streaming via SSE with assistant_turn event

This commit is contained in:
Joseph Doherty
2026-04-26 13:09:31 -04:00
parent 73d8b0c092
commit 9b45710cb1
6 changed files with 458 additions and 9 deletions
+2
View File
@@ -20,6 +20,7 @@ from chat.web.kickoff import router as kickoff_router
from chat.web.nav import router as nav_router
from chat.web.settings import router as settings_router
from chat.web.sse import router as sse_router
from chat.web.turns import router as turns_router
@asynccontextmanager
@@ -42,6 +43,7 @@ app.include_router(settings_router)
app.include_router(nav_router)
app.include_router(chat_router)
app.include_router(sse_router)
app.include_router(turns_router)
@app.get("/health")
+6 -2
View File
@@ -1,14 +1,18 @@
{% extends "layout.html" %}
{% block title %}{{ host_bot.name }} - chat{% endblock %}
{% block content %}
<div class="chat-shell" data-chat-id="{{ chat.id }}">
<div class="chat-shell" data-chat-id="{{ chat.id }}"
hx-ext="sse"
sse-connect="/chats/{{ chat.id }}/events">
<header class="chat-header">
<h1>{{ host_bot.name }}</h1>
<div class="chat-meta muted">{{ chat.time }}</div>
<button class="drawer-toggle" type="button" aria-controls="drawer" aria-expanded="false">Drawer</button>
</header>
<section class="timeline" id="timeline">
<section class="timeline" id="timeline"
sse-swap="turn_html"
hx-swap="beforeend">
{% if not turns %}
<p class="muted">No turns yet. Start typing below.</p>
{% else %}
+13 -2
View File
@@ -16,6 +16,7 @@ from fastapi.templating import Jinja2Templates
from chat.state.entities import get_bot
from chat.state.world import get_chat
from chat.web.bots import get_conn
from chat.web.turns import _read_recent_dialogue
TEMPLATES = Jinja2Templates(
directory=str(Path(__file__).resolve().parent.parent / "templates")
@@ -38,9 +39,19 @@ async def chat_detail(chat_id: str, request: Request, conn=Depends(get_conn)):
status_code=404, detail=f"host bot not found: {chat['host_bot_id']}"
)
# Phase 1, T15: timeline starts empty. T19 will populate from event_log
# by reading user_turn / assistant_turn events for this chat.
# T19: render the timeline from event_log. We pull both user_turn and
# assistant_turn events for this chat, in chronological order. Each row
# is shaped ``{"speaker": ..., "text": ...}`` and the template
# discriminates roles via the speaker id (the literal "you" vs. a bot id).
raw_turns = _read_recent_dialogue(conn, chat_id, limit=200)
turns: list[dict] = []
for t in raw_turns:
if t["speaker"] == "you":
turns.append({"role": "you", "speaker": "you", "text": t["text"]})
else:
bot = get_bot(conn, t["speaker"])
label = bot["name"] if bot else t["speaker"]
turns.append({"role": "bot", "speaker": label, "text": t["text"]})
return TEMPLATES.TemplateResponse(
request,
+21 -5
View File
@@ -32,9 +32,17 @@ router = APIRouter()
_KEEPALIVE_SECONDS = 15.0
def _format_sse(event: str, data: dict) -> bytes:
"""Format a single SSE frame: ``event: <name>\\ndata: <json>\\n\\n``."""
payload = json.dumps(data)
def _format_sse(event: str, data: dict | str) -> bytes:
"""Format a single SSE frame: ``event: <name>\\ndata: <body>\\n\\n``.
``data`` may be a dict (JSON-serialized) or a raw string. The string
form is used for HTMX SSE swaps where the payload is an HTML
fragment that the client splices into the DOM verbatim.
"""
if isinstance(data, str):
payload = data
else:
payload = json.dumps(data)
return f"event: {event}\ndata: {payload}\n\n".encode("utf-8")
@@ -62,10 +70,18 @@ async def chat_events(chat_id: str, request: Request, conn=Depends(get_conn)):
yield b": keepalive\n\n"
continue
# Allow publishers to set the SSE event name via "event" key;
# default to "message" if omitted.
# default to "message" if omitted. When the remaining payload
# is a single ``data`` string, send it verbatim — that lets
# turn-flow publishers ship pre-rendered HTML fragments that
# HTMX's SSE extension can swap into the DOM directly.
event = dict(event) # don't mutate the published dict
kind = event.pop("event", "message")
yield _format_sse(kind, event)
if set(event.keys()) == {"data"} and isinstance(
event["data"], str
):
yield _format_sse(kind, event["data"])
else:
yield _format_sse(kind, event)
finally:
await unsubscribe(chat_id, queue)
+239
View File
@@ -0,0 +1,239 @@
"""POST ``/chats/<id>/turns`` — narrative turn flow with SSE streaming.
The turn flow strings together the pieces built in T17 (turn parser), T18
(prompt assembler), and T16 (SSE channel):
1. Parse the user's prose with the classifier into typed segments.
2. Append a ``user_turn`` event capturing both the original prose and the
parsed segments.
3. Append a placeholder ``assistant_turn_started`` marker so observers know
a response is in flight.
4. Build the narrative prompt, dropping OOC segments before they reach the
bot (per Requirements §6.1 the OOC convention is for the author to talk
to the system, not to the in-fiction bot).
5. Stream tokens from the LLM, broadcasting each chunk over the chat's SSE
channel as a ``token`` event so any subscribed browser tab sees them
arrive in real time.
6. On stream complete, append an ``assistant_turn`` event with the full
text and ``truncated=False``. Also publish a ``turn_html`` event with a
ready-to-swap HTML fragment so HTMX's SSE extension can append it to
the timeline without a page reload.
7. Return ``204 No Content`` — the SSE channel is the real conveyor of
state, not the POST response body.
Errors during streaming flip the assistant_turn's ``truncated`` flag to
``True`` and we still commit what we received. ``asyncio.CancelledError``
is treated identically and re-raised after recording the partial turn.
"""
from __future__ import annotations
import asyncio
import html
import json
from fastapi import APIRouter, Depends, Form, HTTPException, Request
from fastapi.responses import Response
from chat.eventlog.log import append_event
from chat.services.prompt import assemble_narrative_prompt
from chat.services.turn_parse import ParsedTurn, parse_turn
from chat.state.world import get_chat
from chat.state.entities import get_bot
from chat.web.bots import get_conn
from chat.web.kickoff import get_llm_client
from chat.web.pubsub import publish
router = APIRouter()
def _strip_ooc_for_prompt(parsed: ParsedTurn) -> str:
"""Concatenate non-OOC segments back to a prose string for the prompt.
OOC segments (``((double parens))``) are kept in the user_turn payload
for transcript display but stripped before assembly so the bot never
sees author-to-system messages.
"""
keep = [s.text for s in parsed.segments if s.kind != "ooc"]
return " ".join(keep).strip()
def _read_recent_dialogue(conn, chat_id: str, limit: int = 200) -> list[dict]:
"""Return ``user_turn`` and ``assistant_turn`` events for ``chat_id``.
Ordered oldest-first. Skips superseded and hidden rows so regenerated
turns (T29) drop out of the rendered timeline. Each entry is shaped
``{"speaker": <id-or-"you">, "text": <prose>}`` for the prompt
assembler and the chat-detail template.
"""
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(limit,),
)
rows = cur.fetchall()
rows.reverse() # back to chronological order
out: list[dict] = []
for _row_id, kind, payload_json in rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
continue
if kind == "user_turn":
out.append({"speaker": "you", "text": p.get("prose", "")})
else:
out.append(
{
"speaker": p.get("speaker_id", "bot"),
"text": p.get("text", ""),
}
)
return out
def _render_turn_html(speaker_label: str, text: str, *, role: str) -> str:
"""Render a single turn as a small HTML fragment (escaped)."""
return (
f'<div class="turn turn-{role}">'
f"<strong>{html.escape(speaker_label)}</strong>"
f"<p>{html.escape(text)}</p>"
f"</div>"
)
@router.post("/chats/{chat_id}/turns")
async def post_turn(
chat_id: str,
request: Request,
prose: str = Form(""),
conn=Depends(get_conn),
client=Depends(get_llm_client),
):
chat = get_chat(conn, chat_id)
if chat is None:
raise HTTPException(status_code=404, detail=f"chat not found: {chat_id}")
host_bot = get_bot(conn, chat["host_bot_id"])
if host_bot is None:
# Defensive: chat row references a missing bot.
raise HTTPException(
status_code=404,
detail=f"host bot not found: {chat['host_bot_id']}",
)
settings = request.app.state.settings
# 1. Parse turn (classifier).
parsed = await parse_turn(
client, model=settings.classifier_model, prose=prose
)
prompt_prose = _strip_ooc_for_prompt(parsed)
# 2. Append user_turn event.
user_turn_event_id = append_event(
conn,
kind="user_turn",
payload={
"chat_id": chat_id,
"prose": prose,
"segments": [s.model_dump() for s in parsed.segments],
},
)
# 3. Append assistant_turn_started placeholder. ``user_turn``,
# ``assistant_turn_started``, and ``assistant_turn`` have no registered
# projector handlers — they live in the event_log purely for transcript
# rendering — so we don't call ``project`` here. (Re-projecting now would
# also re-run prior non-idempotent inserts like ``chat_created``.)
append_event(
conn,
kind="assistant_turn_started",
payload={
"chat_id": chat_id,
"speaker_id": host_bot["id"],
"user_turn_id": user_turn_event_id,
},
)
# 4. Build the narrative prompt.
recent = _read_recent_dialogue(conn, chat_id, limit=20)
# Drop the just-appended user turn from ``recent`` — it's passed as
# ``user_turn_prose`` to the assembler and would otherwise duplicate.
if recent and recent[-1].get("speaker") == "you":
recent = recent[:-1]
messages = assemble_narrative_prompt(
conn,
chat_id=chat_id,
speaker_bot_id=host_bot["id"],
user_turn_prose=prompt_prose if prompt_prose else None,
recent_dialogue=recent,
budget_soft=settings.narrative_budget_soft,
budget_hard=settings.narrative_budget_hard,
)
# 5. Stream and accumulate tokens.
accumulated: list[str] = []
truncated = False
cancelled = False
try:
async for chunk in client.stream(
messages, model=settings.narrative_model
):
accumulated.append(chunk)
await publish(
chat_id,
{
"event": "token",
"text": chunk,
"speaker_id": host_bot["id"],
},
)
except asyncio.CancelledError:
# Preserve the partial output before letting the cancellation
# propagate so the transcript reflects what the user actually saw.
truncated = True
cancelled = True
except Exception:
# Surface as a truncated turn rather than losing the partial output.
truncated = True
full_text = "".join(accumulated)
# 6. Append the assistant_turn with the final text. (See note above on
# why we skip ``project`` for these transcript-only event kinds.)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": chat_id,
"speaker_id": host_bot["id"],
"text": full_text,
"truncated": truncated,
"user_turn_id": user_turn_event_id,
},
)
# 7. Broadcast a JSON completion event (for JS consumers) and an HTML
# fragment event (for HTMX SSE swap-into-timeline).
await publish(
chat_id,
{
"event": "assistant_turn_complete",
"speaker_id": host_bot["id"],
"text": full_text,
"truncated": truncated,
},
)
assistant_html = _render_turn_html(
host_bot["name"], full_text, role="bot"
)
await publish(
chat_id, {"event": "turn_html", "data": assistant_html}
)
if cancelled:
# Re-raise after the partial-turn has been recorded.
raise asyncio.CancelledError
return Response(status_code=204)
+177
View File
@@ -0,0 +1,177 @@
"""End-to-end turn flow (T19): user POSTs prose, server parses, streams via SSE.
Covers:
- POST ``/chats/<id>/turns`` returns 404 when the chat doesn't exist.
- A successful POST appends both a ``user_turn`` and an ``assistant_turn``
event in chronological order. The assistant payload carries the full
streamed text and ``truncated=False``.
- After a turn lands, the chat detail GET renders the user prose and the
assistant text from the event log.
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from chat.app import app
from chat.db.connection import open_db
from chat.eventlog.log import append_event
from chat.eventlog.projector import project
from chat.llm.mock import MockLLMClient
@pytest.fixture
def client(tmp_path, monkeypatch):
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
db = tmp_path / "test.db"
monkeypatch.setenv("CHAT_DB_PATH", str(db))
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "hello"}]}
)
canned_response = "Hi there."
# Import here so env vars are visible to the dependency lookup.
from chat.web.kickoff import get_llm_client
mock = MockLLMClient(canned=[canned_parse, canned_response])
app.dependency_overrides[get_llm_client] = lambda: mock
with TestClient(app) as c:
c.mock_llm = mock # type: ignore[attr-defined]
yield c
app.dependency_overrides.clear()
def _seed(db_path: Path) -> None:
"""Author a bot, create a chat, and seed enough state for prompt assembly."""
with open_db(db_path) as conn:
append_event(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "thoughtful, observant",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
# Seed an edge so the prompt assembler has something to render.
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
"knowledge_facts": ["coworker"],
},
)
# Activity for both speakers — required by the prompt assembler.
append_event(
conn,
kind="activity_change",
payload={
"entity_id": "you",
"posture": "sitting",
"action": {
"verb": "talking",
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
append_event(
conn,
kind="activity_change",
payload={
"entity_id": "bot_a",
"posture": "sitting",
"action": {
"verb": "listening",
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
project(conn)
def test_post_turn_404_when_chat_missing(client):
response = client.post("/chats/no_such/turns", data={"prose": "hello"})
assert response.status_code == 404
def test_post_turn_appends_user_and_assistant_events(client, tmp_path):
_seed(tmp_path / "test.db")
response = client.post(
"/chats/chat_bot_a/turns", data={"prose": "hello"}
)
assert response.status_code == 204
with open_db(tmp_path / "test.db") as conn:
cur = conn.execute(
"SELECT kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn') ORDER BY id"
)
rows = cur.fetchall()
assert len(rows) == 2
assert rows[0][0] == "user_turn"
assert rows[1][0] == "assistant_turn"
user_payload = json.loads(rows[0][1])
assert user_payload["chat_id"] == "chat_bot_a"
assert user_payload["prose"] == "hello"
# Segments come from the canned classifier output.
assert any(
s.get("kind") == "dialogue" and s.get("text") == "hello"
for s in user_payload["segments"]
)
assistant_payload = json.loads(rows[1][1])
assert assistant_payload["chat_id"] == "chat_bot_a"
assert assistant_payload["speaker_id"] == "bot_a"
assert assistant_payload["text"] == "Hi there."
assert assistant_payload["truncated"] is False
def test_get_chat_renders_existing_turns(client, tmp_path):
_seed(tmp_path / "test.db")
post = client.post("/chats/chat_bot_a/turns", data={"prose": "hello"})
assert post.status_code == 204
response = client.get("/chats/chat_bot_a")
assert response.status_code == 200
body = response.text
assert "hello" in body
assert "Hi there." in body