Files
chat/chat/services/turn_common.py
T

126 lines
4.5 KiB
Python

"""Shared helpers for turn flows (T83.2).
Both ``chat.web.turns.post_turn`` and
``chat.services.regenerate.regenerate_assistant_turn`` need to:
1. Pull a chronological tail of user-side and assistant_turn events for
prompt assembly + state-update inputs.
2. Build a directed-edge dict over a fixed set of "present" entity ids
for the multi-pair state-update pass (with the schema 50/50 default
filled in for missing rows).
Before T83.2 each call site had its own copy of these blocks. The two
copies drifted on details (T73.1 added ``user_turn_edit`` handling to
turns.py; regenerate.py had a slightly different recent-window query).
This module is the single source so a future change to either lands in
both flows by construction.
Note on overlap with ``chat.services.scene_summarize._read_recent_dialogue``:
that helper has a ``since_event_id`` clamp (T80.2 thread-detection
scope) and intentionally does NOT include ``user_turn_edit`` events —
its callers want the *original* prose, not edits. Deduplicating it
into here would either (a) require a new flag on the shared helper for
``user_turn_edit`` inclusion, or (b) silently change scene_summarize's
read shape. Both feel more invasive than the duplication is bad, so
that helper is left alone for now.
"""
from __future__ import annotations
import json
from sqlite3 import Connection
from chat.state.edges import get_edge
def read_recent_dialogue(
conn: Connection,
chat_id: str,
*,
limit: int = 50,
exclude_event_id: int | None = None,
) -> list[dict]:
"""Pull the last ``limit`` user-side / assistant_turn events for
``chat_id`` as ``[{"speaker": <id-or-"you">, "text": <prose>}]``,
chronologically ordered (oldest first).
Filters: ``superseded_by IS NULL AND hidden = 0`` — regenerated
rows drop out so the timeline reflects the current state. Includes
``user_turn``, ``user_turn_edit`` (T29 edited prose substitutes for
the original — the original is marked superseded above), and
``assistant_turn`` rows.
``exclude_event_id`` is an optional event_log id to skip — used by
regenerate to drop the original assistant_turn from its prompt
context window before that row has been marked superseded (the
supersede UPDATE lands at the end so the new event_id is known).
"""
if exclude_event_id is None:
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(limit,),
)
else:
cur = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') "
" AND id != ? "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(exclude_event_id, limit),
)
rows = list(reversed(cur.fetchall()))
out: list[dict] = []
for row_id, kind, payload_json in rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
continue
if kind in ("user_turn", "user_turn_edit"):
out.append(
{
"speaker": "you",
"text": p.get("prose", ""),
"event_id": row_id,
}
)
else:
out.append(
{
"speaker": p.get("speaker_id", "bot"),
"text": p.get("text", ""),
"event_id": row_id,
}
)
return out
def gather_prior_edges(
conn: Connection, present_ids: list[str]
) -> dict[tuple[str, str], dict]:
"""Build ``{(src, tgt): {affinity, trust, summary}}`` for every
directed pair where both ``src`` and ``tgt`` are in ``present_ids``
and ``src != tgt``.
Missing rows fall back to the schema default 50/50 baseline (mirrors
the Phase 1 single-pair flow). Used by post_turn and regenerate to
seed the multi-pair state-update classifier.
"""
prior_edges: dict[tuple[str, str], dict] = {}
for src in present_ids:
for tgt in present_ids:
if src == tgt:
continue
edge = get_edge(conn, src, tgt) or {
"affinity": 50,
"trust": 50,
"summary": "",
}
prior_edges[(src, tgt)] = edge
return prior_edges
__all__ = ["read_recent_dialogue", "gather_prior_edges"]