refactor: extract turn_common helpers from regenerate + turns (T83.2)

The recent-dialogue read and the directed-pair edge gather were
duplicated between ``chat.services.regenerate`` and ``chat.web.turns``.
Extracted into ``chat.services.turn_common`` with two helpers:

- ``read_recent_dialogue(conn, chat_id, *, limit, exclude_event_id)``:
  oldest-first ``[{speaker, text}]`` over user_turn / user_turn_edit /
  assistant_turn rows, with the standard ``superseded_by IS NULL AND
  hidden = 0`` filter. ``exclude_event_id`` covers regenerate's need to
  drop the original assistant_turn before its supersede UPDATE lands.
- ``gather_prior_edges(conn, present_ids)``: ``{(src, tgt): edge}`` over
  every directed pair across ``present_ids``, with the schema default
  50/50 baseline for missing rows.

``chat.web.turns._read_recent_dialogue`` becomes a thin delegate so the
chat-detail template and other in-module callers keep their import
shape; ``_gather_state_update_inputs`` now calls into the shared edge
gather. ``regenerate_assistant_turn`` calls both helpers in three call
sites (primary + post-interjection edges, primary + interjection
recent reads), still post-processing speaker ids to display names for
its prompts.

Decision: ``chat.services.scene_summarize._read_recent_dialogue`` is
left in place — it has a ``since_event_id`` clamp (T80.2) and excludes
``user_turn_edit`` deliberately. Folding it into the shared helper
would either silently change its read shape or require a second flag,
both more invasive than the duplication. Documented in the new module
docstring.

Tests: tests/test_turn_common.py covers chronological ordering,
supersede / other-chat / exclude_event_id filtering, and prior-edge
default-fallback. Existing 6 regenerate + 18 turn_flow tests pass
unchanged.
This commit is contained in:
Joseph Doherty
2026-04-26 22:14:59 -04:00
parent f2fd30c5a9
commit d833bbc3e7
4 changed files with 389 additions and 110 deletions
+44 -68
View File
@@ -80,6 +80,10 @@ from chat.services.interjection import detect_interjection
from chat.services.memory_write import record_turn_memory_for_present
from chat.services.multi_state_update import compute_state_updates_for_present
from chat.services.prompt import assemble_narrative_prompt
from chat.services.turn_common import (
gather_prior_edges,
read_recent_dialogue,
)
from chat.state.edges import get_edge
from chat.state.entities import get_bot, get_you
from chat.state.events import list_active_events
@@ -209,33 +213,30 @@ async def regenerate_assistant_turn(
# assistant_turn explicitly (we haven't superseded it yet — that
# update lands at the end so the new event_id is known) and use the
# standard ``superseded_by IS NULL AND hidden = 0`` filter so any
# prior regenerates also drop out.
# prior regenerates also drop out. T83.2: shared helper handles the
# SQL + filtering; we post-process to map speaker ids to display
# names for the prompt.
you_entity = get_you(conn) or {"name": "you", "persona": ""}
you_name = you_entity.get("name", "you")
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND id != ? "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT 20",
(original_assistant_event_id,),
raw_recent = read_recent_dialogue(
conn,
chat_id,
limit=20,
exclude_event_id=original_assistant_event_id,
)
rows = list(reversed(cur.fetchall()))
recent: list[dict] = []
for _eid, kind, payload_json in rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
for entry in raw_recent:
spk = entry.get("speaker", "bot")
if spk == "you":
recent.append({"speaker": you_name, "text": entry.get("text", "")})
continue
if kind in ("user_turn", "user_turn_edit"):
recent.append({"speaker": you_name, "text": p.get("prose", "")})
else:
spk = p.get("speaker_id", "bot")
if spk == host_bot_id:
spk_name = host_bot.get("name", "bot")
if spk == host_bot_id:
spk_name = host_bot.get("name", "bot")
elif guest_bot is not None and spk == guest_bot.get("id"):
spk_name = guest_bot.get("name", "bot")
recent.append({"speaker": spk_name, "text": p.get("text", "")})
elif guest_bot is not None and spk == guest_bot.get("id"):
spk_name = guest_bot.get("name", "bot")
else:
spk_name = host_bot.get("name", "bot")
recent.append({"speaker": spk_name, "text": entry.get("text", "")})
# 4. Assemble the narrative prompt. ``recent`` already excludes the
# current user prose, which we pass through ``user_turn_prose``.
@@ -373,17 +374,8 @@ async def regenerate_assistant_turn(
present_names[guest_bot_id] = guest_bot.get("name", "bot")
personas[guest_bot_id] = guest_bot.get("persona") or ""
prior_edges: dict[tuple[str, str], dict] = {}
for src in present_ids:
for tgt in present_ids:
if src == tgt:
continue
edge = get_edge(conn, src, tgt) or {
"affinity": 50,
"trust": 50,
"summary": "",
}
prior_edges[(src, tgt)] = edge
# T83.2: shared helper builds the directed-pair edge dict.
prior_edges = gather_prior_edges(conn, present_ids)
state_updates = await compute_state_updates_for_present(
client,
@@ -472,34 +464,27 @@ async def regenerate_assistant_turn(
)
if decision.should_interject:
# Re-read recent so the just-appended primary is in the prompt.
interject_cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT 20",
)
interject_rows = list(reversed(interject_cur.fetchall()))
# Re-read recent so the just-appended primary is in the
# prompt. T83.2: shared helper + the same id->name mapping
# as the primary read above.
raw_interject = read_recent_dialogue(conn, chat_id, limit=20)
interject_recent: list[dict] = []
for _eid, kind, payload_json in interject_rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
for entry in raw_interject:
spk = entry.get("speaker", "bot")
if spk == "you":
interject_recent.append(
{"speaker": you_name, "text": entry.get("text", "")}
)
continue
if kind in ("user_turn", "user_turn_edit"):
interject_recent.append(
{"speaker": you_name, "text": p.get("prose", "")}
)
if spk == host_bot_id:
spk_name = host_bot.get("name", "bot")
elif spk == guest_bot.get("id"):
spk_name = guest_bot.get("name", "bot")
else:
spk = p.get("speaker_id", "bot")
if spk == host_bot_id:
spk_name = host_bot.get("name", "bot")
elif spk == guest_bot.get("id"):
spk_name = guest_bot.get("name", "bot")
else:
spk_name = "bot"
interject_recent.append(
{"speaker": spk_name, "text": p.get("text", "")}
)
spk_name = "bot"
interject_recent.append(
{"speaker": spk_name, "text": entry.get("text", "")}
)
if interject_recent and interject_recent[-1].get("speaker") == you_name:
interject_recent = interject_recent[:-1]
@@ -603,17 +588,8 @@ async def regenerate_assistant_turn(
"text": interject_text,
}
]
prior_edges_post: dict[tuple[str, str], dict] = {}
for src in present_ids:
for tgt in present_ids:
if src == tgt:
continue
edge = get_edge(conn, src, tgt) or {
"affinity": 50,
"trust": 50,
"summary": "",
}
prior_edges_post[(src, tgt)] = edge
# T83.2: shared helper handles the directed-pair edge dict.
prior_edges_post = gather_prior_edges(conn, present_ids)
state_updates_post = await compute_state_updates_for_present(
client,
+118
View File
@@ -0,0 +1,118 @@
"""Shared helpers for turn flows (T83.2).
Both ``chat.web.turns.post_turn`` and
``chat.services.regenerate.regenerate_assistant_turn`` need to:
1. Pull a chronological tail of user-side and assistant_turn events for
prompt assembly + state-update inputs.
2. Build a directed-edge dict over a fixed set of "present" entity ids
for the multi-pair state-update pass (with the schema 50/50 default
filled in for missing rows).
Before T83.2 each call site had its own copy of these blocks. The two
copies drifted on details (T73.1 added ``user_turn_edit`` handling to
turns.py; regenerate.py had a slightly different recent-window query).
This module is the single source so a future change to either lands in
both flows by construction.
Note on overlap with ``chat.services.scene_summarize._read_recent_dialogue``:
that helper has a ``since_event_id`` clamp (T80.2 thread-detection
scope) and intentionally does NOT include ``user_turn_edit`` events —
its callers want the *original* prose, not edits. Deduplicating it
into here would either (a) require a new flag on the shared helper for
``user_turn_edit`` inclusion, or (b) silently change scene_summarize's
read shape. Both feel more invasive than the duplication is bad, so
that helper is left alone for now.
"""
from __future__ import annotations
import json
from sqlite3 import Connection
from chat.state.edges import get_edge
def read_recent_dialogue(
conn: Connection,
chat_id: str,
*,
limit: int = 50,
exclude_event_id: int | None = None,
) -> list[dict]:
"""Pull the last ``limit`` user-side / assistant_turn events for
``chat_id`` as ``[{"speaker": <id-or-"you">, "text": <prose>}]``,
chronologically ordered (oldest first).
Filters: ``superseded_by IS NULL AND hidden = 0`` — regenerated
rows drop out so the timeline reflects the current state. Includes
``user_turn``, ``user_turn_edit`` (T29 edited prose substitutes for
the original — the original is marked superseded above), and
``assistant_turn`` rows.
``exclude_event_id`` is an optional event_log id to skip — used by
regenerate to drop the original assistant_turn from its prompt
context window before that row has been marked superseded (the
supersede UPDATE lands at the end so the new event_id is known).
"""
if exclude_event_id is None:
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(limit,),
)
else:
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND id != ? "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(exclude_event_id, limit),
)
rows = list(reversed(cur.fetchall()))
out: list[dict] = []
for _row_id, kind, payload_json in rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
continue
if kind in ("user_turn", "user_turn_edit"):
out.append({"speaker": "you", "text": p.get("prose", "")})
else:
out.append(
{
"speaker": p.get("speaker_id", "bot"),
"text": p.get("text", ""),
}
)
return out
def gather_prior_edges(
conn: Connection, present_ids: list[str]
) -> dict[tuple[str, str], dict]:
"""Build ``{(src, tgt): {affinity, trust, summary}}`` for every
directed pair where both ``src`` and ``tgt`` are in ``present_ids``
and ``src != tgt``.
Missing rows fall back to the schema default 50/50 baseline (mirrors
the Phase 1 single-pair flow). Used by post_turn and regenerate to
seed the multi-pair state-update classifier.
"""
prior_edges: dict[tuple[str, str], dict] = {}
for src in present_ids:
for tgt in present_ids:
if src == tgt:
continue
edge = get_edge(conn, src, tgt) or {
"affinity": 50,
"trust": 50,
"summary": "",
}
prior_edges[(src, tgt)] = edge
return prior_edges
__all__ = ["read_recent_dialogue", "gather_prior_edges"]
+12 -42
View File
@@ -71,6 +71,10 @@ from chat.services.prompt import (
from chat.services.rewind import compute_rewind_preview, execute_rewind
from chat.services.scene_close import detect_scene_close
from chat.services.scene_summarize import apply_scene_close_summary
from chat.services.turn_common import (
gather_prior_edges,
read_recent_dialogue,
)
from chat.services.turn_parse import ParsedTurn, parse_turn
from chat.state.edges import get_edge
from chat.state.entities import get_bot, get_you
@@ -113,38 +117,13 @@ def _strip_ooc_for_prompt(parsed: ParsedTurn) -> str:
def _read_recent_dialogue(conn, chat_id: str, limit: int = 200) -> list[dict]:
"""Return user-side and assistant_turn events for ``chat_id``.
Includes ``user_turn``, ``user_turn_edit`` (T29 edited prose), and
``assistant_turn``. Ordered oldest-first; superseded/hidden rows are
skipped so regenerated turns (T29) drop out of the rendered timeline.
Each entry is shaped ``{"speaker": <id-or-"you">, "text": <prose>}``
for the prompt assembler and the chat-detail template.
T83.2: thin delegate over
:func:`chat.services.turn_common.read_recent_dialogue` so post_turn
and regenerate share one implementation. The wrapper survives so
the chat-detail template and other callers in this module don't all
have to update at once.
"""
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(limit,),
)
rows = cur.fetchall()
rows.reverse() # back to chronological order
out: list[dict] = []
for _row_id, kind, payload_json in rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
continue
if kind in ("user_turn", "user_turn_edit"):
# Edited prose substitutes for the original user_turn (the
# original is marked superseded_by and filtered above).
out.append({"speaker": "you", "text": p.get("prose", "")})
else:
out.append(
{
"speaker": p.get("speaker_id", "bot"),
"text": p.get("text", ""),
}
)
return out
return read_recent_dialogue(conn, chat_id, limit=limit)
def _detect_addressee_id(
@@ -211,17 +190,8 @@ def _gather_state_update_inputs(
present_names[guest_bot["id"]] = guest_bot["name"]
personas[guest_bot["id"]] = guest_bot.get("persona") or ""
prior_edges: dict[tuple[str, str], dict] = {}
for src in present_ids:
for tgt in present_ids:
if src == tgt:
continue
edge = get_edge(conn, src, tgt) or {
"affinity": 50,
"trust": 50,
"summary": "",
}
prior_edges[(src, tgt)] = edge
# T83.2: directed-edge gather is shared with regenerate.py.
prior_edges = gather_prior_edges(conn, present_ids)
return present_ids, present_names, personas, prior_edges