555 lines
19 KiB
Python
555 lines
19 KiB
Python
"""Narrative-prompt assembly with must/should/nice trim tiers.
|
|
|
|
Implements Task 18 (Phase 1D). See Requirements §3.2 (token budgets and
|
|
trim tiers) and §6.3 (speaker prompt assembly order). The function
|
|
:func:`assemble_narrative_prompt` returns a list of
|
|
:class:`chat.llm.client.Message` objects ready to feed to
|
|
``LLMClient.generate``.
|
|
|
|
Trim policy when the assembled prompt exceeds the soft target:
|
|
|
|
- **MUST-include** (never trimmed): system / speaker identity, the
|
|
speaker→addressee edge, the activity snapshot for all present
|
|
entities, the current scene description, and the last 4 turns of
|
|
dialogue.
|
|
- **SHOULD-include** (trim when over budget): other edges of the
|
|
speaker. (Group nodes, active threads, and active events / props are
|
|
Phase 3 — skipped here.)
|
|
- **NICE-include** (trim first): retrieved memories beyond top-2,
|
|
dialogue turns beyond the last 4 (replaced with a one-line elision
|
|
placeholder), per-POV summary of the previous scene.
|
|
|
|
Token counting uses ``tiktoken.get_encoding("cl100k_base")`` per the
|
|
requirements. Mistral / Llama tokenizers diverge ~5%; we accept the
|
|
drift.
|
|
|
|
The function is intentionally deterministic (no LLM call) so it is
|
|
testable with synthetic state and so T29's regenerate flow can rebuild
|
|
prompts without re-running classifiers.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from sqlite3 import Connection
|
|
|
|
import tiktoken
|
|
|
|
from chat.llm.client import Message
|
|
from chat.state.edges import get_edge, list_edges_for
|
|
from chat.state.entities import get_bot, get_you
|
|
from chat.state.memory import search_memories
|
|
from chat.state.world import (
|
|
active_scene,
|
|
get_activity,
|
|
get_chat,
|
|
get_container,
|
|
get_scene,
|
|
)
|
|
|
|
|
|
# Cache the encoder once at import-time. tiktoken's encoder load is
|
|
# non-trivial (~tens of ms) and the encoding is process-wide stable.
|
|
_ENCODER = tiktoken.get_encoding("cl100k_base")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _count_tokens(text: str, encoding=_ENCODER) -> int:
|
|
"""Return the cl100k_base token count for ``text`` (0 for falsy)."""
|
|
if not text:
|
|
return 0
|
|
return len(encoding.encode(text))
|
|
|
|
|
|
def _build_speaker_identity(bot: dict) -> str:
|
|
"""Render the bot identity block. Skips empty optional fields."""
|
|
lines = [f"You are {bot['name']}."]
|
|
if bot.get("persona"):
|
|
lines.append("")
|
|
lines.append("PERSONA:")
|
|
lines.append(bot["persona"])
|
|
voice_samples = bot.get("voice_samples") or []
|
|
if voice_samples:
|
|
lines.append("")
|
|
lines.append("VOICE REFERENCE:")
|
|
lines.append("\n---\n".join(voice_samples))
|
|
traits = bot.get("traits") or []
|
|
if traits:
|
|
lines.append("")
|
|
lines.append(f"TRAITS: {', '.join(traits)}")
|
|
if bot.get("backstory"):
|
|
lines.append("")
|
|
lines.append("BACKSTORY:")
|
|
lines.append(bot["backstory"])
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_edge_block(edge: dict | None, addressee_name: str) -> str | None:
|
|
"""Render the speaker → addressee edge. Returns None when no edge exists."""
|
|
if edge is None:
|
|
return None
|
|
lines = [f"YOUR EDGE TO {addressee_name}:"]
|
|
lines.append(f"- Affinity: {edge.get('affinity', 50)}/100")
|
|
lines.append(f"- Trust: {edge.get('trust', 50)}/100")
|
|
summary = edge.get("summary") or ""
|
|
if summary:
|
|
lines.append(f"- Summary: {summary}")
|
|
knowledge = edge.get("knowledge") or []
|
|
if knowledge:
|
|
lines.append(f"- What you know about {addressee_name}:")
|
|
for fact in knowledge:
|
|
lines.append(f" * {fact}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_activity_block(activities: list[dict]) -> str | None:
|
|
"""Render the activity snapshot for all present entities."""
|
|
rendered: list[str] = []
|
|
for a in activities:
|
|
if a is None:
|
|
continue
|
|
label = a.get("_display_name") or a.get("entity_id", "?")
|
|
parts: list[str] = []
|
|
posture = a.get("posture") or ""
|
|
if posture:
|
|
parts.append(posture)
|
|
action = a.get("action") or {}
|
|
verb = action.get("verb") if isinstance(action, dict) else None
|
|
if verb:
|
|
parts.append(verb)
|
|
attention = a.get("attention") or ""
|
|
if attention:
|
|
parts.append(f"attention: {attention}")
|
|
holding = a.get("holding") or []
|
|
if holding:
|
|
parts.append(f"holding: {', '.join(holding)}")
|
|
if parts:
|
|
rendered.append(f"- {label}: " + ", ".join(parts))
|
|
else:
|
|
rendered.append(f"- {label}: (no activity recorded)")
|
|
if not rendered:
|
|
return None
|
|
return "ACTIVITIES:\n" + "\n".join(rendered)
|
|
|
|
|
|
def _build_scene_block(chat: dict, container: dict | None, scene: dict | None) -> str | None:
|
|
"""Render the current-scene block. Always present when chat exists."""
|
|
lines = ["CURRENT SCENE:"]
|
|
if container is not None:
|
|
lines.append(f"- Container: {container['name']} ({container['type']})")
|
|
chat_time = chat.get("time") if chat else None
|
|
if chat_time:
|
|
lines.append(f"- Time: {chat_time}")
|
|
if scene is not None and scene.get("started_at"):
|
|
lines.append(f"- Active scene started: {scene['started_at']}")
|
|
if len(lines) == 1:
|
|
return None
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _format_dialogue_turn(turn: dict) -> str:
|
|
speaker = turn.get("speaker") or "?"
|
|
text = turn.get("text") or ""
|
|
return f"{speaker}: {text}"
|
|
|
|
|
|
def _build_dialogue_block(
|
|
recent: list[dict],
|
|
earlier_summary: str | None,
|
|
) -> str | None:
|
|
"""Render the recent-dialogue block. The ``recent`` list is the
|
|
*kept* tail of the dialogue (already trimmed to the last-N turns).
|
|
``earlier_summary``, when non-None, is rendered as the first line as
|
|
``earlier: <text>`` to flag elided context.
|
|
"""
|
|
if not recent and not earlier_summary:
|
|
return None
|
|
lines = ["RECENT DIALOGUE:"]
|
|
if earlier_summary:
|
|
lines.append(f"earlier: {earlier_summary}")
|
|
for turn in recent:
|
|
lines.append(_format_dialogue_turn(turn))
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_memories_block(memory_summaries: list[str]) -> str | None:
|
|
if not memory_summaries:
|
|
return None
|
|
lines = ["RELEVANT MEMORIES:"]
|
|
for m in memory_summaries:
|
|
lines.append(f"- {m}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_other_edges_block(edges: list[dict]) -> str | None:
|
|
"""Render edges to entities other than the addressee."""
|
|
if not edges:
|
|
return None
|
|
lines = ["OTHER EDGES:"]
|
|
for e in edges:
|
|
target = e.get("_display_name") or e.get("target_id", "?")
|
|
affinity = e.get("affinity", 50)
|
|
trust = e.get("trust", 50)
|
|
lines.append(f"- {target}: affinity {affinity}/100, trust {trust}/100")
|
|
summary = e.get("summary") or ""
|
|
if summary:
|
|
lines.append(f" summary: {summary}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_previous_scene_block(pov_summary: str | None) -> str | None:
|
|
if not pov_summary:
|
|
return None
|
|
return "PREVIOUS SCENE SUMMARY:\n" + pov_summary
|
|
|
|
|
|
def _closing_instruction(speaker_name: str, addressee_name: str) -> str:
|
|
return (
|
|
f"Continue the scene as {speaker_name}, in their voice, responding "
|
|
"naturally. Use *asterisks* for actions and quotes for dialogue. "
|
|
f"Stay in character. Do not narrate {addressee_name}'s actions or "
|
|
"thoughts."
|
|
)
|
|
|
|
|
|
def _join_blocks(blocks: list[str | None]) -> str:
|
|
"""Join non-empty blocks with double newlines."""
|
|
return "\n\n".join(b for b in blocks if b)
|
|
|
|
|
|
def _earlier_summary_placeholder(elided_count: int) -> str:
|
|
"""Phase 1 placeholder. Real summarization is a downstream concern."""
|
|
plural = "turn" if elided_count == 1 else "turns"
|
|
return f"{elided_count} earlier {plural} elided for brevity"
|
|
|
|
|
|
def _resolve_previous_scene_summary(
|
|
conn: Connection, chat_id: str, speaker_bot_id: str
|
|
) -> str | None:
|
|
"""Return ``pov_summary`` of the most recent ended scene, owned by
|
|
the speaker. None if no closed scene exists or no matching memory.
|
|
"""
|
|
row = conn.execute(
|
|
"SELECT id FROM scenes WHERE chat_id = ? AND ended_at IS NOT NULL "
|
|
"ORDER BY ended_at DESC LIMIT 1",
|
|
(chat_id,),
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
scene_id = row[0]
|
|
mem = conn.execute(
|
|
"SELECT pov_summary FROM memories WHERE scene_id = ? AND owner_id = ? "
|
|
"ORDER BY id DESC LIMIT 1",
|
|
(scene_id, speaker_bot_id),
|
|
).fetchone()
|
|
if not mem:
|
|
return None
|
|
return mem[0]
|
|
|
|
|
|
def _resolve_addressee(
|
|
conn: Connection, addressee: str, you: dict | None
|
|
) -> tuple[str, str]:
|
|
"""Return ``(addressee_id, addressee_display_name)``.
|
|
|
|
The function is permissive: ``addressee="you"`` resolves to the
|
|
you-entity (display name is its authored name, falling back to
|
|
"you" if no entity exists yet). Other ids resolve as bot ids.
|
|
"""
|
|
if addressee == "you":
|
|
name = (you or {}).get("name") or "you"
|
|
return "you", name
|
|
bot = get_bot(conn, addressee)
|
|
if bot is not None:
|
|
return addressee, bot["name"]
|
|
return addressee, addressee
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def assemble_narrative_prompt(
|
|
conn: Connection,
|
|
*,
|
|
chat_id: str,
|
|
speaker_bot_id: str,
|
|
addressee: str = "you",
|
|
user_turn_prose: str | None = None,
|
|
recent_dialogue: list[dict] | None = None,
|
|
retrieved_memory_summaries: list[str] | None = None,
|
|
budget_soft: int = 6000,
|
|
budget_hard: int = 8000,
|
|
encoding_name: str = "cl100k_base",
|
|
) -> list[Message]:
|
|
"""Assemble the narrative prompt for ``speaker_bot_id`` to respond.
|
|
|
|
Returns a list of :class:`Message` objects: one ``system`` message
|
|
carrying the assembled context, optionally followed by a single
|
|
``user`` message containing ``user_turn_prose`` (when provided).
|
|
|
|
Trimming proceeds in tiers (NICE → SHOULD) once the total token
|
|
count exceeds ``budget_soft``; the function refuses to exceed
|
|
``budget_hard``. If the MUST-include block alone is already over
|
|
``budget_hard``, :class:`ValueError` is raised — the caller should
|
|
surface the failure rather than ship a malformed prompt.
|
|
"""
|
|
encoding = (
|
|
_ENCODER if encoding_name == "cl100k_base"
|
|
else tiktoken.get_encoding(encoding_name)
|
|
)
|
|
|
|
bot = get_bot(conn, speaker_bot_id)
|
|
if bot is None:
|
|
raise ValueError(f"speaker_bot_id {speaker_bot_id!r} not found")
|
|
|
|
chat = get_chat(conn, chat_id)
|
|
if chat is None:
|
|
raise ValueError(f"chat_id {chat_id!r} not found")
|
|
|
|
you = get_you(conn)
|
|
addressee_id, addressee_name = _resolve_addressee(conn, addressee, you)
|
|
|
|
# ---- Build all components as text strings ------------------------------
|
|
|
|
speaker_identity = _build_speaker_identity(bot)
|
|
|
|
edge_to_addressee = _build_edge_block(
|
|
get_edge(conn, speaker_bot_id, addressee_id),
|
|
addressee_name,
|
|
)
|
|
|
|
# Activity for present entities. Phase 1: you + speaker bot. (When a
|
|
# guest is added in Phase 1+, callers that know about it can pass
|
|
# extra activities via a future hook; for now we keep it strict.)
|
|
activities: list[dict] = []
|
|
you_act = get_activity(conn, "you")
|
|
if you_act is not None:
|
|
you_act = dict(you_act)
|
|
you_act["_display_name"] = (you or {}).get("name") or "you"
|
|
activities.append(you_act)
|
|
bot_act = get_activity(conn, speaker_bot_id)
|
|
if bot_act is not None:
|
|
bot_act = dict(bot_act)
|
|
bot_act["_display_name"] = bot["name"]
|
|
activities.append(bot_act)
|
|
activity_block = _build_activity_block(activities)
|
|
|
|
container = None
|
|
if chat.get("active_scene_id"):
|
|
scene = get_scene(conn, chat["active_scene_id"])
|
|
if scene and scene.get("container_id"):
|
|
container = get_container(conn, scene["container_id"])
|
|
else:
|
|
scene = active_scene(conn, chat_id)
|
|
if container is None and scene and scene.get("container_id"):
|
|
container = get_container(conn, scene["container_id"])
|
|
scene_block = _build_scene_block(chat, container, scene)
|
|
|
|
# Other edges: speaker → non-addressee.
|
|
all_outgoing = list_edges_for(conn, speaker_bot_id)
|
|
other_edges_raw = [e for e in all_outgoing if e.get("target_id") != addressee_id]
|
|
for e in other_edges_raw:
|
|
tid = e.get("target_id")
|
|
if tid == "you":
|
|
e["_display_name"] = (you or {}).get("name") or "you"
|
|
else:
|
|
tb = get_bot(conn, tid) if tid else None
|
|
e["_display_name"] = tb["name"] if tb else (tid or "?")
|
|
other_edges_block = _build_other_edges_block(other_edges_raw)
|
|
|
|
# Memories: caller override wins; otherwise FTS5 search keyed on the
|
|
# scene's container/posture as a coarse query proxy.
|
|
if retrieved_memory_summaries is not None:
|
|
memory_summaries = list(retrieved_memory_summaries)
|
|
else:
|
|
query = (container or {}).get("name") or chat.get("narrative_anchor") or ""
|
|
memory_summaries = []
|
|
if query:
|
|
try:
|
|
hits = search_memories(conn, speaker_bot_id, "host", query, k=4)
|
|
memory_summaries = [h["pov_summary"] for h in hits]
|
|
except Exception:
|
|
memory_summaries = []
|
|
|
|
# Dialogue: caller override only (no event_log read in Phase 1).
|
|
dialogue_full = list(recent_dialogue or [])
|
|
|
|
previous_scene_summary = _resolve_previous_scene_summary(
|
|
conn, chat_id, speaker_bot_id
|
|
)
|
|
|
|
closing = _closing_instruction(bot["name"], addressee_name)
|
|
|
|
# ---- Build the MUST core ----------------------------------------------
|
|
|
|
last4 = dialogue_full[-4:] if dialogue_full else []
|
|
must_dialogue_block = _build_dialogue_block(last4, earlier_summary=None)
|
|
|
|
must_blocks: list[str | None] = [
|
|
speaker_identity,
|
|
edge_to_addressee,
|
|
scene_block,
|
|
activity_block,
|
|
must_dialogue_block,
|
|
closing,
|
|
]
|
|
must_text = _join_blocks(must_blocks)
|
|
must_tokens = _count_tokens(must_text, encoding)
|
|
if must_tokens > budget_hard:
|
|
raise ValueError(
|
|
f"MUST-include block ({must_tokens} tokens) exceeds budget_hard "
|
|
f"({budget_hard}). Cannot assemble prompt."
|
|
)
|
|
|
|
# ---- Stage SHOULD additions, then NICE additions -----------------------
|
|
|
|
# We carry a running "components" list and rebuild the body as we go
|
|
# so token accounting reflects join-overhead. Order in the final
|
|
# prompt follows §6.3: identity → edge → other edges → scene →
|
|
# activities → previous scene summary → memories → dialogue → close.
|
|
|
|
def assemble(
|
|
*,
|
|
include_other_edges: bool,
|
|
include_previous_scene: bool,
|
|
include_memories_top_k: int,
|
|
dialogue_keep: int,
|
|
) -> tuple[str, int, list[dict]]:
|
|
# dialogue: keep the last `dialogue_keep` turns verbatim; older
|
|
# turns become an "earlier:" placeholder line.
|
|
kept_dialogue = (
|
|
dialogue_full[-dialogue_keep:] if dialogue_keep > 0 else []
|
|
)
|
|
elided = max(0, len(dialogue_full) - len(kept_dialogue))
|
|
earlier_summary = (
|
|
_earlier_summary_placeholder(elided) if elided > 0 else None
|
|
)
|
|
dialogue_block = _build_dialogue_block(kept_dialogue, earlier_summary)
|
|
|
|
memories_subset = memory_summaries[:include_memories_top_k]
|
|
memories_block = _build_memories_block(memories_subset)
|
|
|
|
prev_block = (
|
|
_build_previous_scene_block(previous_scene_summary)
|
|
if include_previous_scene else None
|
|
)
|
|
|
|
body = _join_blocks([
|
|
speaker_identity,
|
|
edge_to_addressee,
|
|
other_edges_block if include_other_edges else None,
|
|
scene_block,
|
|
activity_block,
|
|
prev_block,
|
|
memories_block,
|
|
dialogue_block,
|
|
closing,
|
|
])
|
|
return body, _count_tokens(body, encoding), kept_dialogue
|
|
|
|
# Start with the MUST baseline: last 4 turns of dialogue, no
|
|
# SHOULD/NICE extras.
|
|
baseline_keep = min(4, len(dialogue_full))
|
|
|
|
# Try the most generous configuration first; trim greedily.
|
|
nice_dialogue_keep = len(dialogue_full) # all turns, no elision
|
|
nice_memories_k = min(4, len(memory_summaries))
|
|
include_prev = previous_scene_summary is not None
|
|
include_other = other_edges_block is not None
|
|
|
|
body, total, _ = assemble(
|
|
include_other_edges=include_other,
|
|
include_previous_scene=include_prev,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
|
|
# If under soft, we're done.
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
# Drop NICE in order: previous scene → memories beyond top-2 →
|
|
# older dialogue turns (collapse to 4).
|
|
if include_prev:
|
|
body, total, _ = assemble(
|
|
include_other_edges=include_other,
|
|
include_previous_scene=False,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
include_prev = False
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
if nice_memories_k > 2:
|
|
nice_memories_k = 2
|
|
body, total, _ = assemble(
|
|
include_other_edges=include_other,
|
|
include_previous_scene=False,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
if nice_dialogue_keep > baseline_keep:
|
|
nice_dialogue_keep = baseline_keep
|
|
body, total, _ = assemble(
|
|
include_other_edges=include_other,
|
|
include_previous_scene=False,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
# Drop more NICE until we're under hard: memories all the way to 0.
|
|
while nice_memories_k > 0 and total > budget_hard:
|
|
nice_memories_k = max(0, nice_memories_k - 1)
|
|
body, total, _ = assemble(
|
|
include_other_edges=include_other,
|
|
include_previous_scene=False,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
|
|
# Drop SHOULD: other edges.
|
|
if include_other and total > budget_hard:
|
|
include_other = False
|
|
body, total, _ = assemble(
|
|
include_other_edges=False,
|
|
include_previous_scene=False,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
|
|
if total > budget_hard:
|
|
# We've stripped everything optional and we still overflow.
|
|
# MUST alone fits (we checked at the top), so this means our
|
|
# last-4 dialogue + must blocks together exceed hard. Fall back
|
|
# to the bare MUST core.
|
|
body = must_text
|
|
total = must_tokens
|
|
if total > budget_hard:
|
|
raise ValueError(
|
|
f"Prompt cannot fit budget_hard={budget_hard}; MUST core "
|
|
f"is {total} tokens"
|
|
)
|
|
|
|
return _emit(body, user_turn_prose)
|
|
|
|
|
|
def _emit(system_body: str, user_turn_prose: str | None) -> list[Message]:
|
|
msgs: list[Message] = [Message(role="system", content=system_body)]
|
|
if user_turn_prose is not None:
|
|
msgs.append(Message(role="user", content=user_turn_prose))
|
|
return msgs
|
|
|
|
|
|
__all__ = ["assemble_narrative_prompt"]
|