456f50d334
Wire the active branch's [origin_event_id, head_event_id] window into every user-facing event/memory reader so switching branches actually changes what dialogue and memories the user sees. Phase 4 T89/T94 shipped branches as metadata-only — this closes the loop. Helper: - chat/state/branches.py: add `active_branch_event_ids(conn)` returning the active branch's id range, with two defensive fall-throughs to `(0, BIG_INT)`: (a) no active branch row at all, and (b) the bootstrap "main" sentinel (name="main", origin=0, head=0). Production never bumps main's head_event_id today, so this preserves existing reader behaviour for every test that doesn't explicitly switch. Readers updated (all user-facing dialogue / retrieval surfaces): - chat/services/turn_common.py::read_recent_dialogue — chat-history prompt context + the chat-view template path (via web/turns.py + web/chat.py). - chat/services/scene_summarize.py::_read_recent_dialogue — scene-close per-POV summary input. - chat/state/memory.py::search_memories — FTS leg filters via m.event_id (T109's column); legacy NULL event_id rows are *included* unconditionally so the filter doesn't break pre-0014 retrieval. The fused (FTS + RRF + vector) path also drops vector hits whose event_id falls outside the branch window. - chat/web/meanwhile.py::_read_recent_meanwhile_dialogue — meanwhile prompt context. Projector queries (chat/state/world.py et al.) and admin/management surfaces (drawer hide-panel, cross-chat search, regenerate's row lookups by id) are intentionally NOT branch-filtered: projection must see the full log to build state correctly, and the admin surfaces operate across branches by design. Tests (10 new, 446 total): - tests/test_branches_state.py: 3 tests for `active_branch_event_ids` itself (bootstrap-main, no-active-branch, non-main literal range). - tests/test_branching.py: 7 cross-feature tests covering the spec's five required scenarios plus scene_summarize and meanwhile readers.
415 lines
16 KiB
Python
415 lines
16 KiB
Python
"""Meanwhile-mode turn controller (T64).
|
|
|
|
A meanwhile scene is a private 2-bot scene running alongside an active
|
|
you-scene (its parent). The user manually advances it by POSTing to the
|
|
existing ``/chats/<id>/turns`` endpoint; the route detects an active
|
|
meanwhile scene at the start of ``post_turn`` and dispatches here.
|
|
|
|
Unlike the normal turn flow, "you" is NOT a witness to the scene. The
|
|
controller mirrors ``post_turn`` shape but with:
|
|
|
|
- Speaker alternation derived from the latest meanwhile ``assistant_turn``
|
|
scoped to this scene_id (host first, then alternating).
|
|
- Prompt assembly with ``present_set_kind="host_guest"`` so the prompt
|
|
builder drops the "you" activity bullet and any speaker -> "you" edge.
|
|
- Memory writes via ``record_meanwhile_memory`` — both bots get rows
|
|
with witness flags ``[you=0, host=1, guest=1]``.
|
|
- State updates over exactly 2 directed pairs (host <-> guest); no
|
|
you-related pairs fire.
|
|
- The ``assistant_turn`` payload carries ``meanwhile_scene_id`` and
|
|
``present_set_kind="host_guest"`` so downstream filters (alternation
|
|
lookup, drawer rendering, scene-close detection) can scope to the
|
|
meanwhile slice without conflating it with the parent you-scene's
|
|
history.
|
|
|
|
Scene-close detection for meanwhile scenes is not auto-fired here —
|
|
T65 covers the close + digest pipeline. The controller's job ends
|
|
after the post-turn classifier passes land.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
|
|
from chat.config import Settings
|
|
from chat.eventlog.log import append_and_apply, append_event
|
|
from chat.llm.client import LLMClient
|
|
from chat.services.memory_write import record_meanwhile_memory
|
|
from chat.services.multi_state_update import compute_state_updates_for_present
|
|
from chat.services.prompt import assemble_narrative_prompt
|
|
from chat.services.turn_parse import parse_turn
|
|
from chat.state.edges import get_edge
|
|
from chat.state.entities import get_bot
|
|
from chat.state.meanwhile import list_meanwhile_scenes
|
|
from chat.state.world import get_chat
|
|
from chat.web.pubsub import publish
|
|
from chat.web.render import render_turn_html as _render_turn_html
|
|
|
|
|
|
def _strip_ooc_for_prompt(parsed) -> str:
|
|
"""Mirror of the helper in turns.py — concatenate non-OOC segments."""
|
|
keep = [s.text for s in parsed.segments if s.kind != "ooc"]
|
|
return " ".join(keep).strip()
|
|
|
|
|
|
def _read_recent_meanwhile_dialogue(
|
|
conn, chat_id: str, scene_id: int, limit: int = 50
|
|
) -> list[dict]:
|
|
"""Return the meanwhile scene's prior turns shaped as
|
|
``{"speaker": <id>, "text": <prose>}``.
|
|
|
|
Pulls ``user_turn`` rows for the chat (the user-side prose driving
|
|
this meanwhile scene rides through the same chat) plus only those
|
|
``assistant_turn`` rows whose ``meanwhile_scene_id`` matches the
|
|
given scene id. Other meanwhile scenes on the same chat — and the
|
|
parent you-scene's assistant_turns — are excluded so the prompt
|
|
context stays scoped to the private beat.
|
|
|
|
Filters chat_id (and meanwhile_scene_id for assistant_turn) via
|
|
``json_extract`` in SQL so SQLite stops at the first ``limit`` rows
|
|
that already match — avoids an unbounded scan as ``event_log``
|
|
grows. The user-side rows match on chat_id only since they aren't
|
|
tagged with a scene id (they ride the chat-wide log).
|
|
|
|
T113: clamp by the active branch's ``[origin, head]`` event-id range
|
|
so meanwhile prompt context respects the user's current branch.
|
|
Bootstrap-main and "no active branch" both fall through to ``(0,
|
|
BIG_INT)`` — no functional change for the metadata-only Phase 4 era.
|
|
"""
|
|
from chat.state.branches import active_branch_event_ids
|
|
|
|
origin, head = active_branch_event_ids(conn)
|
|
cur = conn.execute(
|
|
"SELECT id, kind, payload_json FROM event_log "
|
|
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
|
|
" AND superseded_by IS NULL AND hidden = 0 "
|
|
" AND id BETWEEN ? AND ? "
|
|
" AND json_extract(payload_json, '$.chat_id') = ? "
|
|
" AND ("
|
|
" kind IN ('user_turn', 'user_turn_edit') "
|
|
" OR json_extract(payload_json, '$.meanwhile_scene_id') = ?"
|
|
" ) "
|
|
"ORDER BY id DESC LIMIT ?",
|
|
(origin, head, chat_id, scene_id, limit),
|
|
)
|
|
rows = cur.fetchall()
|
|
rows.reverse()
|
|
out: list[dict] = []
|
|
for _row_id, kind, payload_json in rows:
|
|
p = json.loads(payload_json)
|
|
if kind in ("user_turn", "user_turn_edit"):
|
|
out.append({"speaker": "you", "text": p.get("prose", "")})
|
|
else:
|
|
out.append(
|
|
{
|
|
"speaker": p.get("speaker_id", "bot"),
|
|
"text": p.get("text", ""),
|
|
}
|
|
)
|
|
return out
|
|
|
|
|
|
def _last_meanwhile_speaker(conn, chat_id: str, scene_id: int) -> str | None:
|
|
"""Return the speaker_id of the latest assistant_turn linked to
|
|
``scene_id`` for ``chat_id``, or ``None`` if no prior turn exists.
|
|
|
|
Used to alternate the speaker across consecutive meanwhile turns —
|
|
the OTHER bot speaks next. Pushes both filters into SQL via
|
|
``json_extract`` and bounds with ``LIMIT 1`` so SQLite stops at the
|
|
first match instead of scanning the whole assistant_turn slice.
|
|
"""
|
|
row = conn.execute(
|
|
"SELECT json_extract(payload_json, '$.speaker_id') AS speaker "
|
|
"FROM event_log "
|
|
"WHERE kind = 'assistant_turn' "
|
|
" AND superseded_by IS NULL AND hidden = 0 "
|
|
" AND json_extract(payload_json, '$.chat_id') = ? "
|
|
" AND json_extract(payload_json, '$.meanwhile_scene_id') = ? "
|
|
"ORDER BY id DESC "
|
|
"LIMIT 1",
|
|
(chat_id, scene_id),
|
|
).fetchone()
|
|
return row[0] if row else None
|
|
|
|
|
|
async def process_meanwhile_turn(
|
|
conn,
|
|
client: LLMClient,
|
|
settings: Settings,
|
|
*,
|
|
chat_id: str,
|
|
prose: str,
|
|
app=None,
|
|
) -> dict:
|
|
"""Run one meanwhile turn end-to-end.
|
|
|
|
Returns a small dict shape ``{"assistant_text": ..., "speaker_id":
|
|
..., "scene_id": ..., "user_turn_id": ..., "assistant_event_id":
|
|
...}`` so callers can introspect the produced beat (HTTP route maps
|
|
to a ``204``; future SSE rebroadcast may use the dict directly).
|
|
|
|
Raises ``ValueError`` when there is no active meanwhile scene on
|
|
``chat_id`` — the caller (turns.py) only dispatches here after a
|
|
positive ``list_meanwhile_scenes`` lookup, but the defensive raise
|
|
keeps the controller usable in isolation.
|
|
"""
|
|
chat = get_chat(conn, chat_id)
|
|
if chat is None:
|
|
raise ValueError(f"chat not found: {chat_id}")
|
|
|
|
scenes = list_meanwhile_scenes(conn, chat_id, status="active")
|
|
if not scenes:
|
|
raise ValueError(f"no active meanwhile scene on chat: {chat_id}")
|
|
scene = scenes[0]
|
|
scene_id = scene["id"]
|
|
|
|
host_bot_id = chat["host_bot_id"]
|
|
guest_bot_id = chat.get("guest_bot_id")
|
|
if guest_bot_id is None:
|
|
# A meanwhile scene without a guest is a schema violation —
|
|
# the projector requires both ids on meanwhile_scene_started.
|
|
raise ValueError(
|
|
f"meanwhile scene {scene_id} on chat {chat_id} lacks a guest"
|
|
)
|
|
|
|
host_bot = get_bot(conn, host_bot_id)
|
|
guest_bot = get_bot(conn, guest_bot_id)
|
|
if host_bot is None or guest_bot is None:
|
|
raise ValueError(
|
|
f"meanwhile bots missing: host={host_bot_id} guest={guest_bot_id}"
|
|
)
|
|
|
|
# 1. Parse the user prose with the classifier — same shape as the
|
|
# normal turn flow so OOC-stripping, segment-typing, etc. all work.
|
|
parsed = await parse_turn(
|
|
client, model=settings.classifier_model, prose=prose
|
|
)
|
|
prompt_prose = _strip_ooc_for_prompt(parsed)
|
|
|
|
# 2. Append user_turn — kept on the chat-wide log so the user can
|
|
# see their own prose in the timeline. Tagged with the meanwhile
|
|
# scene_id so future renderers can group it with the right scene.
|
|
user_turn_event_id = append_event(
|
|
conn,
|
|
kind="user_turn",
|
|
payload={
|
|
"chat_id": chat_id,
|
|
"prose": prose,
|
|
"segments": [s.model_dump() for s in parsed.segments],
|
|
"meanwhile_scene_id": scene_id,
|
|
},
|
|
)
|
|
|
|
# 3. Alternate the speaker. First turn -> host speaks; each
|
|
# subsequent turn -> the OTHER bot from the previous beat. Lookup
|
|
# is scoped by ``meanwhile_scene_id`` so unrelated assistant_turns
|
|
# on the same chat don't perturb the alternation.
|
|
last_speaker = _last_meanwhile_speaker(conn, chat_id, scene_id)
|
|
if last_speaker is None or last_speaker == guest_bot_id:
|
|
speaker_bot = host_bot
|
|
addressee_bot = guest_bot
|
|
else:
|
|
speaker_bot = guest_bot
|
|
addressee_bot = host_bot
|
|
|
|
# 4. Placeholder marker so SSE observers see "in flight". No
|
|
# projector handler is registered for this kind — it's transcript-
|
|
# only, same as the normal turn flow.
|
|
append_event(
|
|
conn,
|
|
kind="assistant_turn_started",
|
|
payload={
|
|
"chat_id": chat_id,
|
|
"speaker_id": speaker_bot["id"],
|
|
"user_turn_id": user_turn_event_id,
|
|
"meanwhile_scene_id": scene_id,
|
|
},
|
|
)
|
|
|
|
# 5. Build the narrative prompt. ``present_set_kind="host_guest"``
|
|
# tells the assembler to drop the "you" activity bullet and any
|
|
# speaker -> "you" edge — both irrelevant inside a private beat.
|
|
# Addressee is the OTHER bot, not "you".
|
|
recent_dialogue = _read_recent_meanwhile_dialogue(conn, chat_id, scene_id)
|
|
if recent_dialogue and recent_dialogue[-1].get("speaker") == "you":
|
|
recent_dialogue = recent_dialogue[:-1]
|
|
messages = assemble_narrative_prompt(
|
|
conn,
|
|
chat_id=chat_id,
|
|
speaker_bot_id=speaker_bot["id"],
|
|
addressee=addressee_bot["id"],
|
|
user_turn_prose=prompt_prose if prompt_prose else None,
|
|
recent_dialogue=recent_dialogue,
|
|
budget_soft=settings.narrative_budget_soft,
|
|
budget_hard=settings.narrative_budget_hard,
|
|
guest_id=guest_bot_id,
|
|
present_set_kind="host_guest",
|
|
)
|
|
|
|
# 6. Stream + accumulate. Same SSE pattern as the normal flow —
|
|
# tokens publish under the speaker's id so the UI can label the
|
|
# right bubble. Register the streaming task in the chat-keyed
|
|
# in-flight registry so POST /chats/<id>/turns/cancel can call
|
|
# ``.cancel()`` on it; without this, the Stop button is a no-op for
|
|
# meanwhile beats. We import the underscore name from turns.py
|
|
# deliberately — it's the same single-process registry the cancel
|
|
# route reads, and exposing it via a public alias would require
|
|
# touching every existing call site for no behavioural gain.
|
|
from chat.web.turns import _in_flight_tasks # noqa: PLC0415
|
|
|
|
accumulated: list[str] = []
|
|
truncated = False
|
|
cancelled = False
|
|
|
|
async def _stream() -> None:
|
|
async for chunk in client.stream(
|
|
messages,
|
|
model=settings.narrative_model,
|
|
max_tokens=settings.narrative_max_tokens,
|
|
temperature=settings.narrative_temperature,
|
|
):
|
|
accumulated.append(chunk)
|
|
await publish(
|
|
chat_id,
|
|
{
|
|
"event": "token",
|
|
"text": chunk,
|
|
"speaker_id": speaker_bot["id"],
|
|
},
|
|
)
|
|
|
|
stream_task = asyncio.create_task(_stream())
|
|
_in_flight_tasks[chat_id] = stream_task
|
|
try:
|
|
await stream_task
|
|
except asyncio.CancelledError:
|
|
truncated = True
|
|
cancelled = True
|
|
except Exception:
|
|
truncated = True
|
|
finally:
|
|
# Always unregister so a subsequent turn can register a fresh
|
|
# task. Mirrors the cleanup in turns.py::post_turn.
|
|
_in_flight_tasks.pop(chat_id, None)
|
|
|
|
text = "".join(accumulated)
|
|
|
|
# 7. Append assistant_turn — tagged with meanwhile_scene_id so the
|
|
# next turn's alternation lookup can find it, and present_set_kind
|
|
# so downstream renderers / digesters can filter scope.
|
|
assistant_event_id = append_event(
|
|
conn,
|
|
kind="assistant_turn",
|
|
payload={
|
|
"chat_id": chat_id,
|
|
"speaker_id": speaker_bot["id"],
|
|
"text": text,
|
|
"truncated": truncated,
|
|
"user_turn_id": user_turn_event_id,
|
|
"meanwhile_scene_id": scene_id,
|
|
"present_set_kind": "host_guest",
|
|
},
|
|
)
|
|
|
|
# 8. Per-turn memory writes — both bots get a row with witness flags
|
|
# [you=0, host=1, guest=1]. Skipped on cancellation so we don't
|
|
# record memory for a partial beat the user never read.
|
|
if not cancelled and text.strip():
|
|
record_meanwhile_memory(
|
|
conn,
|
|
chat_id=chat_id,
|
|
host_bot_id=host_bot_id,
|
|
guest_bot_id=guest_bot_id,
|
|
narrative_text=text,
|
|
scene_id=scene_id,
|
|
chat_clock_at=chat.get("time"),
|
|
app=app,
|
|
)
|
|
|
|
# 9. Post-turn state-update — exactly 2 directed pairs over the
|
|
# bot pair. No you-related pairs fire (you isn't present).
|
|
present_ids = [host_bot_id, guest_bot_id]
|
|
present_names = {
|
|
host_bot_id: host_bot["name"],
|
|
guest_bot_id: guest_bot["name"],
|
|
}
|
|
personas = {
|
|
host_bot_id: host_bot.get("persona") or "",
|
|
guest_bot_id: guest_bot.get("persona") or "",
|
|
}
|
|
prior_edges: dict[tuple[str, str], dict] = {}
|
|
for src in present_ids:
|
|
for tgt in present_ids:
|
|
if src == tgt:
|
|
continue
|
|
edge = get_edge(conn, src, tgt) or {
|
|
"affinity": 50,
|
|
"trust": 50,
|
|
"summary": "",
|
|
}
|
|
prior_edges[(src, tgt)] = edge
|
|
|
|
state_updates = await compute_state_updates_for_present(
|
|
client,
|
|
classifier_model=settings.classifier_model,
|
|
present_ids=present_ids,
|
|
present_names=present_names,
|
|
personas=personas,
|
|
prior_edges=prior_edges,
|
|
recent_dialogue=recent_dialogue,
|
|
timeout_s=settings.classifier_timeout_s,
|
|
)
|
|
last_at = chat.get("time")
|
|
for src_id, tgt_id, update in state_updates:
|
|
append_and_apply(
|
|
conn,
|
|
kind="edge_update",
|
|
payload={
|
|
"source_id": src_id,
|
|
"target_id": tgt_id,
|
|
"chat_id": chat_id,
|
|
"affinity_delta": update.affinity_delta,
|
|
"trust_delta": update.trust_delta,
|
|
"knowledge_facts": update.knowledge_facts,
|
|
"last_interaction_at": last_at,
|
|
"last_interaction_chat_id": chat_id,
|
|
},
|
|
)
|
|
|
|
# 10. SSE broadcast for the timeline UI — completion event + an HTML
|
|
# fragment for the HTMX SSE swap. Same pattern as the normal turn
|
|
# flow so the rendered transcript shows the meanwhile beat inline.
|
|
await publish(
|
|
chat_id,
|
|
{
|
|
"event": "assistant_turn_complete",
|
|
"speaker_id": speaker_bot["id"],
|
|
"text": text,
|
|
"truncated": truncated,
|
|
},
|
|
)
|
|
turn_html = _render_turn_html(
|
|
speaker_bot["name"],
|
|
text,
|
|
role="bot",
|
|
event_id=assistant_event_id,
|
|
)
|
|
await publish(chat_id, {"event": "turn_html", "data": turn_html})
|
|
|
|
if cancelled:
|
|
# Re-raise after the partial-turn has been recorded so callers
|
|
# see the cancel propagate (mirrors normal turn flow).
|
|
raise asyncio.CancelledError
|
|
|
|
return {
|
|
"assistant_text": text,
|
|
"speaker_id": speaker_bot["id"],
|
|
"scene_id": scene_id,
|
|
"user_turn_id": user_turn_event_id,
|
|
"assistant_event_id": assistant_event_id,
|
|
}
|
|
|
|
|
|
__all__ = ["process_meanwhile_turn"]
|