feat: streaming UX with Stop, disconnect handling, send-lock

This commit is contained in:
Joseph Doherty
2026-04-26 14:27:39 -04:00
parent 330077afcf
commit 0353d592cd
4 changed files with 345 additions and 9 deletions
+10
View File
@@ -113,3 +113,13 @@ code { font-family: ui-monospace, "SF Mono", Menlo, monospace; }
.memory-list li { padding: 4px 0; font-size: 13px; }
.sig { display: inline-block; min-width: 16px; }
.sig-3 { color: #d4af37; }
/* Streaming UX (T34): typing indicator, Stop button, disconnect banner. */
.streaming { opacity: 0.85; }
.streaming-text:after {
content: "\025AE";
margin-left: 2px;
animation: blink 1s steps(2, start) infinite;
}
@keyframes blink { to { visibility: hidden; } }
.stop-streaming { background: #c33; border-color: #a00; margin-bottom: 8px; align-self: flex-start; }
.connection-lost { margin-bottom: 8px; }
+103
View File
@@ -46,4 +46,107 @@ document.querySelector('.drawer-toggle')?.addEventListener('click', (e) => {
e.target.setAttribute('aria-expanded', String(isHidden));
});
</script>
<script>
// Streaming UX (T34): typing indicator, Stop button, send-lock,
// disconnect banner. Listens to the existing HTMX SSE channel for
// `token` (per-chunk) and `turn_html` (final swap) events. The
// mid-stream disconnect path is server-side: ``request.is_disconnected()``
// in T19 commits truncated; this script just shows the banner when
// the SSE EventSource fires `error` after the connection drops.
(function () {
const shell = document.querySelector('.chat-shell');
if (!shell) return;
const chatId = shell.dataset.chatId;
const form = shell.querySelector('.turn-input');
if (!form) return;
const textarea = form.querySelector('textarea[name="prose"]');
const sendBtn = form.querySelector('button[type="submit"]');
const timeline = document.getElementById('timeline');
let isStreaming = false;
let typingEl = null;
function ensureTypingEl() {
if (typingEl) return typingEl;
typingEl = document.createElement('div');
typingEl.className = 'turn turn-bot streaming';
typingEl.innerHTML = '<strong>...</strong><p class="streaming-text"></p>';
timeline.appendChild(typingEl);
return typingEl;
}
function unlock() {
isStreaming = false;
if (sendBtn) sendBtn.disabled = false;
if (textarea) textarea.disabled = false;
const stop = shell.querySelector('.stop-streaming');
if (stop) stop.remove();
}
function showBanner(msg) {
let banner = shell.querySelector('.connection-lost');
if (banner) return;
banner = document.createElement('div');
banner.className = 'connection-lost error';
banner.textContent = msg;
form.parentElement.insertBefore(banner, form);
}
// HTMX SSE extension dispatches `htmx:sseMessage` with detail.type
// (event name) and detail.data (payload string).
shell.addEventListener('htmx:sseMessage', (e) => {
const evt = e.detail.type;
const data = e.detail.data;
if (evt === 'token' && isStreaming) {
let parsed;
try { parsed = JSON.parse(data); } catch (_) { return; }
const el = ensureTypingEl();
el.querySelector('.streaming-text').textContent += (parsed.text || '');
} else if (evt === 'turn_html') {
// The server already pushes the final HTML via sse-swap on the
// timeline element; we just remove the typing placeholder and
// unlock the input. (Don't replace innerHTML here — HTMX has
// already done the append by the time this fires.)
if (typingEl) {
typingEl.remove();
typingEl = null;
}
unlock();
}
});
// SSE connection lost — show a banner and unlock so the user can
// retry. The server commits the partial as truncated when its
// request.is_disconnected() poll trips (T19).
shell.addEventListener('htmx:sseError', () => {
if (isStreaming) {
showBanner('connection lost — partial response saved');
unlock();
}
});
form.addEventListener('submit', () => {
isStreaming = true;
if (sendBtn) sendBtn.disabled = true;
if (textarea) textarea.disabled = true;
if (!shell.querySelector('.stop-streaming')) {
const stopBtn = document.createElement('button');
stopBtn.type = 'button';
stopBtn.className = 'stop-streaming btn';
stopBtn.textContent = 'Stop';
stopBtn.addEventListener('click', async () => {
try {
await fetch('/chats/' + encodeURIComponent(chatId) + '/turns/cancel', {
method: 'POST',
});
} catch (_) {
// Network error on cancel is non-fatal — server will time out
// its own stream eventually and commit truncated.
}
});
form.parentElement.insertBefore(stopBtn, form);
}
});
})();
</script>
{% endblock %}
+56 -9
View File
@@ -58,6 +58,14 @@ from chat.web.render import render_turn_html as _render_turn_html
router = APIRouter()
# Module-level registry of in-flight streaming tasks, keyed by chat_id.
# The POST /chats/<id>/turns/cancel route looks up the task and calls
# .cancel(); the streaming coroutine in post_turn catches the resulting
# CancelledError, commits the partial as truncated, and unregisters.
# Single-process v1 only — sufficient for one user with multiple tabs.
_in_flight_tasks: dict[str, asyncio.Task] = {}
def _strip_ooc_for_prompt(parsed: ParsedTurn) -> str:
"""Concatenate non-OOC segments back to a prose string for the prompt.
@@ -70,16 +78,17 @@ def _strip_ooc_for_prompt(parsed: ParsedTurn) -> str:
def _read_recent_dialogue(conn, chat_id: str, limit: int = 200) -> list[dict]:
"""Return ``user_turn`` and ``assistant_turn`` events for ``chat_id``.
"""Return user-side and assistant_turn events for ``chat_id``.
Ordered oldest-first. Skips superseded and hidden rows so regenerated
turns (T29) drop out of the rendered timeline. Each entry is shaped
``{"speaker": <id-or-"you">, "text": <prose>}`` for the prompt
assembler and the chat-detail template.
Includes ``user_turn``, ``user_turn_edit`` (T29 edited prose), and
``assistant_turn``. Ordered oldest-first; superseded/hidden rows are
skipped so regenerated turns (T29) drop out of the rendered timeline.
Each entry is shaped ``{"speaker": <id-or-"you">, "text": <prose>}``
for the prompt assembler and the chat-detail template.
"""
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn') "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(limit,),
@@ -91,7 +100,9 @@ def _read_recent_dialogue(conn, chat_id: str, limit: int = 200) -> list[dict]:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
continue
if kind == "user_turn":
if kind in ("user_turn", "user_turn_edit"):
# Edited prose substitutes for the original user_turn (the
# original is marked superseded_by and filtered above).
out.append({"speaker": "you", "text": p.get("prose", "")})
else:
out.append(
@@ -173,11 +184,16 @@ async def post_turn(
budget_hard=settings.narrative_budget_hard,
)
# 5. Stream and accumulate tokens.
# 5. Stream and accumulate tokens. The stream runs as a Task so the
# /turns/cancel route can invoke ``Task.cancel()`` to abort it
# mid-stream. ``accumulated`` is a closure over the inner coroutine,
# so when the await on ``stream_task`` raises CancelledError below
# we still see whatever tokens were appended before cancellation.
accumulated: list[str] = []
truncated = False
cancelled = False
try:
async def _stream() -> None:
async for chunk in client.stream(
messages, model=settings.narrative_model
):
@@ -190,6 +206,11 @@ async def post_turn(
"speaker_id": host_bot["id"],
},
)
stream_task = asyncio.create_task(_stream())
_in_flight_tasks[chat_id] = stream_task
try:
await stream_task
except asyncio.CancelledError:
# Preserve the partial output before letting the cancellation
# propagate so the transcript reflects what the user actually saw.
@@ -198,6 +219,9 @@ async def post_turn(
except Exception:
# Surface as a truncated turn rather than losing the partial output.
truncated = True
finally:
# Always unregister so a subsequent turn can register a fresh task.
_in_flight_tasks.pop(chat_id, None)
full_text = "".join(accumulated)
@@ -403,6 +427,29 @@ async def post_turn(
return Response(status_code=204)
# ---------------------------------------------------------------------------
# Cancel route (Task 34).
#
# Fire-and-forget: the Stop button POSTs here, we mark the in-flight
# streaming Task as cancelled, and return 204 immediately. The cancel
# propagates into the streaming coroutine on its next await, the
# CancelledError handler in ``post_turn`` catches it, and the partial
# is committed with ``truncated=True``. No body is needed — the SSE
# channel is the conveyor of state. If no turn is in flight (or the
# task already completed), we 204 silently so the client can fire the
# Stop button without a precondition check.
# ---------------------------------------------------------------------------
@router.post("/chats/{chat_id}/turns/cancel")
async def cancel_turn(chat_id: str, request: Request):
task = _in_flight_tasks.get(chat_id)
if task is None or task.done():
return Response(status_code=204)
task.cancel()
return Response(status_code=204)
# ---------------------------------------------------------------------------
# Rewind routes (Task 28).
#
+176
View File
@@ -0,0 +1,176 @@
"""Streaming UX tests (T34): cancel route, recent-dialogue user_turn_edit
inclusion, and the chat-shell embeds the streaming JS hooks.
The cancel route is exercised at the no-op level only — the full mid-stream
cancel path is covered indirectly by T19's CancelledError handling. We
verify here that the route itself is registered and silently 204s when no
in-flight task exists, since the JS Stop button fires unconditionally.
The user_turn_edit inclusion test is the T29 follow-up fix: without it,
the original user_turn drops out of the timeline (correctly) but the
edited prose never lands (incorrectly), so the rendered chat detail is
missing the user's most recent words.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from chat.app import app
from chat.db.connection import open_db
from chat.eventlog.log import append_event
from chat.eventlog.projector import project
@pytest.fixture
def client(tmp_path, monkeypatch):
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
db = tmp_path / "test.db"
monkeypatch.setenv("CHAT_DB_PATH", str(db))
with TestClient(app) as c:
# Disable the lifespan-managed background worker so it doesn't
# try to score significance through Featherless with the fake key.
worker = getattr(app.state, "background_worker", None)
if worker is not None:
worker.enabled = False
yield c
def _seed_chat(
db_path: Path,
bot_id: str = "bot_a",
chat_id: str = "chat_bot_a",
) -> None:
"""Seed a bot + chat with the activity rows the prompt assembler expects."""
with open_db(db_path) as conn:
append_event(
conn,
kind="bot_authored",
payload={
"id": bot_id,
"name": "BotA",
"persona": "...",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
},
)
append_event(
conn,
kind="chat_created",
payload={
"id": chat_id,
"host_bot_id": bot_id,
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": bot_id,
"target_id": "you",
"chat_id": chat_id,
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "you",
"target_id": bot_id,
"chat_id": chat_id,
},
)
append_event(
conn,
kind="activity_change",
payload={
"entity_id": "you",
"posture": "sitting",
"action": {"verb": "talking"},
},
)
append_event(
conn,
kind="activity_change",
payload={
"entity_id": bot_id,
"posture": "sitting",
"action": {"verb": "listening"},
},
)
project(conn)
def test_cancel_route_no_op_when_no_in_flight(client, tmp_path):
"""Hitting cancel with nothing streaming returns 204 silently."""
_seed_chat(tmp_path / "test.db")
response = client.post("/chats/chat_bot_a/turns/cancel")
assert response.status_code == 204
def test_user_turn_edit_appears_in_recent_dialogue(client, tmp_path):
"""The chat-detail timeline includes a user_turn_edit's prose.
Original user_turn is superseded by the edit, so it drops out, but
the edit's prose should render in its place.
"""
db_path = tmp_path / "test.db"
_seed_chat(db_path)
with open_db(db_path) as conn:
ut_id = append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "OriginalUserText",
"segments": [],
},
)
edit_id = append_event(
conn,
kind="user_turn_edit",
payload={
"chat_id": "chat_bot_a",
"prose": "EditedUserText",
"supersedes_user_turn_id": ut_id,
},
)
conn.execute(
"UPDATE event_log SET superseded_by = ? WHERE id = ?",
(edit_id, ut_id),
)
conn.commit()
# No project() call — user_turn / user_turn_edit have no projector
# handlers (transcript-only kinds), and re-projecting would replay
# chat_created and trip its UNIQUE constraint.
response = client.get("/chats/chat_bot_a")
assert response.status_code == 200
body = response.text
assert "EditedUserText" in body
# The original (now-superseded) prose must not render.
assert "OriginalUserText" not in body
def test_chat_html_includes_stop_streaming_script(client, tmp_path):
"""The chat shell embeds the streaming-JS hooks (Stop button + send-lock)."""
_seed_chat(tmp_path / "test.db")
response = client.get("/chats/chat_bot_a")
assert response.status_code == 200
body = response.text
# Either the CSS class for the Stop button or the JS state flag must
# appear in the embedded script — both are load-bearing for T34.
assert "stop-streaming" in body or "isStreaming" in body
# Cancel route reference must be wired so the Stop button can call it.
assert "/turns/cancel" in body