73bb8c1f17
T18 review (Phase 1) noted the NICE-tier trim drops previous-scene
FIRST while §6.3 spec lists previous-scene LAST in the NICE tier
group. Decision: keep the existing greedy order (previous-scene
first), and document why.
Rationale (now in code at the trim ladder):
1. Cheapest-impact-first — a per-POV previous-scene summary loses
less narrative continuity than the older dialogue turns or
memory hits it competes with.
2. Greedy lookahead is more expensive than the marginal narrative
loss. Dropping previous-scene typically clears the soft-budget
slack in one step.
Test added: test_nice_trim_order_documented pins the observed order
(previous-scene -> memories -> dialogue) so a future refactor can't
silently invert it. Sized so that all-NICE config overflows soft but
dropping just previous-scene fits — proves memories and older
dialogue turns survive while previous-scene is the FIRST drop.
736 lines
28 KiB
Python
736 lines
28 KiB
Python
"""Narrative-prompt assembly with must/should/nice trim tiers.
|
|
|
|
Implements Task 18 (Phase 1D). See Requirements §3.2 (token budgets and
|
|
trim tiers) and §6.3 (speaker prompt assembly order). The function
|
|
:func:`assemble_narrative_prompt` returns a list of
|
|
:class:`chat.llm.client.Message` objects ready to feed to
|
|
``LLMClient.generate``.
|
|
|
|
Trim policy when the assembled prompt exceeds the soft target:
|
|
|
|
- **MUST-include** (never trimmed): system / speaker identity, the
|
|
speaker→addressee edge, the activity snapshot for all present
|
|
entities, the current scene description, and the last 4 turns of
|
|
dialogue.
|
|
- **SHOULD-include** (trim when over budget): other edges of the
|
|
speaker. (Group nodes, active threads, and active events / props are
|
|
Phase 3 — skipped here.)
|
|
- **NICE-include** (trim first): retrieved memories beyond top-2,
|
|
dialogue turns beyond the last 4 (replaced with a one-line elision
|
|
placeholder), per-POV summary of the previous scene.
|
|
|
|
Token counting uses ``tiktoken.get_encoding("cl100k_base")`` per the
|
|
requirements. Mistral / Llama tokenizers diverge ~5%; we accept the
|
|
drift.
|
|
|
|
The function is intentionally deterministic (no LLM call) so it is
|
|
testable with synthetic state and so T29's regenerate flow can rebuild
|
|
prompts without re-running classifiers.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from sqlite3 import Connection
|
|
|
|
import tiktoken
|
|
|
|
from chat.llm.client import Message
|
|
from chat.state.edges import get_edge, list_edges_for
|
|
from chat.state.entities import get_bot, get_you
|
|
from chat.state.group_node import get_group_node
|
|
from chat.state.memory import search_memories
|
|
from chat.state.world import (
|
|
active_scene,
|
|
get_activity,
|
|
get_chat,
|
|
get_container,
|
|
get_scene,
|
|
)
|
|
|
|
|
|
# Cache the encoder once at import-time. tiktoken's encoder load is
|
|
# non-trivial (~tens of ms) and the encoding is process-wide stable.
|
|
_ENCODER = tiktoken.get_encoding("cl100k_base")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _count_tokens(text: str, encoding=_ENCODER) -> int:
|
|
"""Return the cl100k_base token count for ``text`` (0 for falsy)."""
|
|
if not text:
|
|
return 0
|
|
return len(encoding.encode(text))
|
|
|
|
|
|
def _build_speaker_identity(bot: dict) -> str:
|
|
"""Render the bot identity block. Skips empty optional fields."""
|
|
lines = [f"You are {bot['name']}."]
|
|
if bot.get("persona"):
|
|
lines.append("")
|
|
lines.append("PERSONA:")
|
|
lines.append(bot["persona"])
|
|
voice_samples = bot.get("voice_samples") or []
|
|
if voice_samples:
|
|
lines.append("")
|
|
lines.append("VOICE REFERENCE:")
|
|
lines.append("\n---\n".join(voice_samples))
|
|
traits = bot.get("traits") or []
|
|
if traits:
|
|
lines.append("")
|
|
lines.append(f"TRAITS: {', '.join(traits)}")
|
|
if bot.get("backstory"):
|
|
lines.append("")
|
|
lines.append("BACKSTORY:")
|
|
lines.append(bot["backstory"])
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_edge_block(edge: dict | None, addressee_name: str) -> str | None:
|
|
"""Render the speaker → addressee edge. Returns None when no edge exists."""
|
|
if edge is None:
|
|
return None
|
|
lines = [f"YOUR EDGE TO {addressee_name}:"]
|
|
lines.append(f"- Affinity: {edge.get('affinity', 50)}/100")
|
|
lines.append(f"- Trust: {edge.get('trust', 50)}/100")
|
|
summary = edge.get("summary") or ""
|
|
if summary:
|
|
lines.append(f"- Summary: {summary}")
|
|
knowledge = edge.get("knowledge") or []
|
|
if knowledge:
|
|
lines.append(f"- What you know about {addressee_name}:")
|
|
for fact in knowledge:
|
|
lines.append(f" * {fact}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_activity_block(activities: list[dict]) -> str | None:
|
|
"""Render the activity snapshot for all present entities."""
|
|
rendered: list[str] = []
|
|
for a in activities:
|
|
if a is None:
|
|
continue
|
|
label = a.get("_display_name") or a.get("entity_id", "?")
|
|
parts: list[str] = []
|
|
posture = a.get("posture") or ""
|
|
if posture:
|
|
parts.append(posture)
|
|
action = a.get("action") or {}
|
|
verb = action.get("verb") if isinstance(action, dict) else None
|
|
if verb:
|
|
parts.append(verb)
|
|
attention = a.get("attention") or ""
|
|
if attention:
|
|
parts.append(f"attention: {attention}")
|
|
holding = a.get("holding") or []
|
|
if holding:
|
|
parts.append(f"holding: {', '.join(holding)}")
|
|
if parts:
|
|
rendered.append(f"- {label}: " + ", ".join(parts))
|
|
else:
|
|
rendered.append(f"- {label}: (no activity recorded)")
|
|
if not rendered:
|
|
return None
|
|
return "ACTIVITIES:\n" + "\n".join(rendered)
|
|
|
|
|
|
def _build_scene_block(chat: dict, container: dict | None, scene: dict | None) -> str | None:
|
|
"""Render the current-scene block. Always present when chat exists."""
|
|
lines = ["CURRENT SCENE:"]
|
|
if container is not None:
|
|
lines.append(f"- Container: {container['name']} ({container['type']})")
|
|
chat_time = chat.get("time") if chat else None
|
|
if chat_time:
|
|
lines.append(f"- Time: {chat_time}")
|
|
if scene is not None and scene.get("started_at"):
|
|
lines.append(f"- Active scene started: {scene['started_at']}")
|
|
if len(lines) == 1:
|
|
return None
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _format_dialogue_turn(turn: dict) -> str:
|
|
speaker = turn.get("speaker") or "?"
|
|
text = turn.get("text") or ""
|
|
return f"{speaker}: {text}"
|
|
|
|
|
|
def _build_dialogue_block(
|
|
recent: list[dict],
|
|
earlier_summary: str | None,
|
|
) -> str | None:
|
|
"""Render the recent-dialogue block. The ``recent`` list is the
|
|
*kept* tail of the dialogue (already trimmed to the last-N turns).
|
|
``earlier_summary``, when non-None, is rendered as the first line as
|
|
``earlier: <text>`` to flag elided context.
|
|
"""
|
|
if not recent and not earlier_summary:
|
|
return None
|
|
lines = ["RECENT DIALOGUE:"]
|
|
if earlier_summary:
|
|
lines.append(f"earlier: {earlier_summary}")
|
|
for turn in recent:
|
|
lines.append(_format_dialogue_turn(turn))
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_memories_block(memory_summaries: list[str]) -> str | None:
|
|
if not memory_summaries:
|
|
return None
|
|
lines = ["RELEVANT MEMORIES:"]
|
|
for m in memory_summaries:
|
|
lines.append(f"- {m}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_other_edges_block(edges: list[dict]) -> str | None:
|
|
"""Render edges to entities other than the addressee."""
|
|
if not edges:
|
|
return None
|
|
lines = ["OTHER EDGES:"]
|
|
for e in edges:
|
|
target = e.get("_display_name") or e.get("target_id", "?")
|
|
affinity = e.get("affinity", 50)
|
|
trust = e.get("trust", 50)
|
|
lines.append(f"- {target}: affinity {affinity}/100, trust {trust}/100")
|
|
summary = e.get("summary") or ""
|
|
if summary:
|
|
lines.append(f" summary: {summary}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_previous_scene_block(pov_summary: str | None) -> str | None:
|
|
if not pov_summary:
|
|
return None
|
|
return "PREVIOUS SCENE SUMMARY:\n" + pov_summary
|
|
|
|
|
|
def _build_group_node_block(group_node: dict | None) -> str | None:
|
|
"""Render the group-node summary + dynamic as a SHOULD-tier block.
|
|
|
|
Used only in 3-entity scenes (you + host + guest). Returns None when
|
|
the row is missing or both summary and dynamic are empty.
|
|
"""
|
|
if not group_node:
|
|
return None
|
|
summary = (group_node.get("summary") or "").strip()
|
|
dynamic = (group_node.get("dynamic") or "").strip()
|
|
if not summary and not dynamic:
|
|
return None
|
|
lines = ["Group dynamic:"]
|
|
if summary:
|
|
lines.append(f"- Summary: {summary}")
|
|
if dynamic:
|
|
lines.append(f"- Dynamic: {dynamic}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _closing_instruction(speaker_name: str, addressee_name: str) -> str:
|
|
return (
|
|
f"Continue the scene as {speaker_name}, in their voice, responding "
|
|
"naturally. Use *asterisks* for actions and quotes for dialogue. "
|
|
f"Stay in character. Do not narrate {addressee_name}'s actions or "
|
|
"thoughts. "
|
|
"Keep your response to a single beat — one or two short paragraphs "
|
|
"at most. Don't monologue; leave room for the other person to react."
|
|
)
|
|
|
|
|
|
def _join_blocks(blocks: list[str | None]) -> str:
|
|
"""Join non-empty blocks with double newlines."""
|
|
return "\n\n".join(b for b in blocks if b)
|
|
|
|
|
|
def _earlier_summary_placeholder(elided_count: int) -> str:
|
|
"""Phase 1 placeholder. Real summarization is a downstream concern."""
|
|
plural = "turn" if elided_count == 1 else "turns"
|
|
return f"{elided_count} earlier {plural} elided for brevity"
|
|
|
|
|
|
def _resolve_previous_scene_summary(
|
|
conn: Connection, chat_id: str, speaker_bot_id: str
|
|
) -> str | None:
|
|
"""Return ``pov_summary`` of the most recent ended scene, owned by
|
|
the speaker. None if no closed scene exists or no matching memory.
|
|
"""
|
|
row = conn.execute(
|
|
"SELECT id FROM scenes WHERE chat_id = ? AND ended_at IS NOT NULL "
|
|
"ORDER BY ended_at DESC LIMIT 1",
|
|
(chat_id,),
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
scene_id = row[0]
|
|
mem = conn.execute(
|
|
"SELECT pov_summary FROM memories WHERE scene_id = ? AND owner_id = ? "
|
|
"ORDER BY id DESC LIMIT 1",
|
|
(scene_id, speaker_bot_id),
|
|
).fetchone()
|
|
if not mem:
|
|
return None
|
|
return mem[0]
|
|
|
|
|
|
def _witness_role_for(speaker_bot_id: str, host_bot_id: str | None) -> str:
|
|
"""Return the witness POV role for the speaker's memory query.
|
|
|
|
The host bot of a chat queries memories with ``witness_role="host"``;
|
|
the guest bot queries with ``witness_role="guest"``. Phase 2 T46
|
|
pinned the contract on ``search_memories``; this helper applies it
|
|
at the call site so a guest-as-speaker doesn't silently retrieve
|
|
memories under the wrong POV mask.
|
|
"""
|
|
return "host" if speaker_bot_id == host_bot_id else "guest"
|
|
|
|
|
|
def _resolve_addressee(
|
|
conn: Connection, addressee: str, you: dict | None
|
|
) -> tuple[str, str]:
|
|
"""Return ``(addressee_id, addressee_display_name)``.
|
|
|
|
The function is permissive: ``addressee="you"`` resolves to the
|
|
you-entity (display name is its authored name, falling back to
|
|
"you" if no entity exists yet). Other ids resolve as bot ids.
|
|
"""
|
|
if addressee == "you":
|
|
name = (you or {}).get("name") or "you"
|
|
return "you", name
|
|
bot = get_bot(conn, addressee)
|
|
if bot is not None:
|
|
return addressee, bot["name"]
|
|
return addressee, addressee
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def assemble_narrative_prompt(
|
|
conn: Connection,
|
|
*,
|
|
chat_id: str,
|
|
speaker_bot_id: str,
|
|
addressee: str = "you",
|
|
user_turn_prose: str | None = None,
|
|
recent_dialogue: list[dict] | None = None,
|
|
retrieved_memory_summaries: list[str] | None = None,
|
|
budget_soft: int = 6000,
|
|
budget_hard: int = 8000,
|
|
encoding_name: str = "cl100k_base",
|
|
guest_id: str | None = None,
|
|
) -> list[Message]:
|
|
"""Assemble the narrative prompt for ``speaker_bot_id`` to respond.
|
|
|
|
Returns a list of :class:`Message` objects: one ``system`` message
|
|
carrying the assembled context, optionally followed by a single
|
|
``user`` message containing ``user_turn_prose`` (when provided).
|
|
|
|
Trimming proceeds in tiers (NICE → SHOULD) once the total token
|
|
count exceeds ``budget_soft``; the function refuses to exceed
|
|
``budget_hard``. If the MUST-include block alone is already over
|
|
``budget_hard``, :class:`ValueError` is raised — the caller should
|
|
surface the failure rather than ship a malformed prompt.
|
|
"""
|
|
encoding = (
|
|
_ENCODER if encoding_name == "cl100k_base"
|
|
else tiktoken.get_encoding(encoding_name)
|
|
)
|
|
|
|
bot = get_bot(conn, speaker_bot_id)
|
|
if bot is None:
|
|
raise ValueError(f"speaker_bot_id {speaker_bot_id!r} not found")
|
|
|
|
chat = get_chat(conn, chat_id)
|
|
if chat is None:
|
|
raise ValueError(f"chat_id {chat_id!r} not found")
|
|
|
|
# Auto-detect guest from chat state when caller didn't pass one.
|
|
# Phase 1 chats have ``guest_bot_id is None``; the auto-detect is a
|
|
# no-op there and the function behaves exactly as before.
|
|
if guest_id is None:
|
|
guest_id = chat.get("guest_bot_id")
|
|
# A speaker addressing themself as guest doesn't add a third party.
|
|
if guest_id is not None and guest_id == speaker_bot_id:
|
|
guest_id = None
|
|
|
|
you = get_you(conn)
|
|
addressee_id, addressee_name = _resolve_addressee(conn, addressee, you)
|
|
|
|
# ---- Build all components as text strings ------------------------------
|
|
|
|
speaker_identity = _build_speaker_identity(bot)
|
|
|
|
edge_to_addressee = _build_edge_block(
|
|
get_edge(conn, speaker_bot_id, addressee_id),
|
|
addressee_name,
|
|
)
|
|
|
|
# Activity for present entities — single ACTIVITIES: block with up
|
|
# to three bullets (you, speaker, guest). The block itself is
|
|
# MUST-tier and survives all trims, but bullet-level trim drops
|
|
# bullets in the order guest -> you, keeping the speaker bullet
|
|
# (the speaker's own current activity is the load-bearing slice).
|
|
#
|
|
# T71.2 chose Option B from the polish plan: pre-truncate the
|
|
# bullets list at trim time before _build_activity_block runs,
|
|
# rather than introducing a granular tier mode in the trim
|
|
# machinery. The single-block render avoids the dual-ACTIVITIES:
|
|
# header that Phase 2 T43 introduced (read by some LLMs as a
|
|
# duplicate-section bug).
|
|
you_activity: dict | None = None
|
|
you_act = get_activity(conn, "you")
|
|
if you_act is not None:
|
|
you_activity = dict(you_act)
|
|
you_activity["_display_name"] = (you or {}).get("name") or "you"
|
|
|
|
speaker_activity: dict | None = None
|
|
bot_act = get_activity(conn, speaker_bot_id)
|
|
if bot_act is not None:
|
|
speaker_activity = dict(bot_act)
|
|
speaker_activity["_display_name"] = bot["name"]
|
|
|
|
guest_activity: dict | None = None
|
|
if guest_id is not None:
|
|
guest_act = get_activity(conn, guest_id)
|
|
if guest_act is not None:
|
|
guest_activity = dict(guest_act)
|
|
guest_bot = get_bot(conn, guest_id)
|
|
guest_activity["_display_name"] = (
|
|
guest_bot["name"] if guest_bot else guest_id
|
|
)
|
|
|
|
def _activity_block_for(
|
|
*, include_you: bool, include_guest: bool
|
|
) -> str | None:
|
|
"""Render the single ACTIVITIES: block with the requested bullets.
|
|
|
|
Speaker bullet is always included (it's the MUST-tier baseline);
|
|
``you`` and ``guest`` bullets are toggled by the caller during
|
|
trim. Returns None when no bullets remain.
|
|
"""
|
|
bullets: list[dict] = []
|
|
if include_you and you_activity is not None:
|
|
bullets.append(you_activity)
|
|
if speaker_activity is not None:
|
|
bullets.append(speaker_activity)
|
|
if include_guest and guest_activity is not None:
|
|
bullets.append(guest_activity)
|
|
return _build_activity_block(bullets)
|
|
|
|
# SHOULD-tier group-node block (Phase 2 / Task 43): rendered only
|
|
# when the group_node row is present AND it covers all three of
|
|
# you + host + guest (per the Task 43 spec).
|
|
group_node_block: str | None = None
|
|
if guest_id is not None:
|
|
gn = get_group_node(conn, chat_id)
|
|
if gn is not None:
|
|
members = set(gn.get("members") or [])
|
|
host_id = chat.get("host_bot_id")
|
|
required = {"you"}
|
|
if host_id is not None:
|
|
required.add(host_id)
|
|
required.add(guest_id)
|
|
if required.issubset(members):
|
|
group_node_block = _build_group_node_block(gn)
|
|
|
|
container = None
|
|
if chat.get("active_scene_id"):
|
|
scene = get_scene(conn, chat["active_scene_id"])
|
|
if scene and scene.get("container_id"):
|
|
container = get_container(conn, scene["container_id"])
|
|
else:
|
|
scene = active_scene(conn, chat_id)
|
|
if container is None and scene and scene.get("container_id"):
|
|
container = get_container(conn, scene["container_id"])
|
|
scene_block = _build_scene_block(chat, container, scene)
|
|
|
|
# Other edges: speaker → non-addressee.
|
|
all_outgoing = list_edges_for(conn, speaker_bot_id)
|
|
other_edges_raw = [e for e in all_outgoing if e.get("target_id") != addressee_id]
|
|
for e in other_edges_raw:
|
|
tid = e.get("target_id")
|
|
if tid == "you":
|
|
e["_display_name"] = (you or {}).get("name") or "you"
|
|
else:
|
|
tb = get_bot(conn, tid) if tid else None
|
|
e["_display_name"] = tb["name"] if tb else (tid or "?")
|
|
other_edges_block = _build_other_edges_block(other_edges_raw)
|
|
|
|
# Memories: caller override wins; otherwise FTS5 search keyed on the
|
|
# scene's container/posture as a coarse query proxy.
|
|
if retrieved_memory_summaries is not None:
|
|
memory_summaries = list(retrieved_memory_summaries)
|
|
else:
|
|
query = (container or {}).get("name") or chat.get("narrative_anchor") or ""
|
|
memory_summaries = []
|
|
if query:
|
|
try:
|
|
witness_role = _witness_role_for(
|
|
speaker_bot_id, chat.get("host_bot_id")
|
|
)
|
|
hits = search_memories(
|
|
conn, speaker_bot_id, witness_role, query, k=4
|
|
)
|
|
memory_summaries = [h["pov_summary"] for h in hits]
|
|
except Exception:
|
|
memory_summaries = []
|
|
|
|
# Dialogue: caller override only (no event_log read in Phase 1).
|
|
dialogue_full = list(recent_dialogue or [])
|
|
|
|
previous_scene_summary = _resolve_previous_scene_summary(
|
|
conn, chat_id, speaker_bot_id
|
|
)
|
|
|
|
closing = _closing_instruction(bot["name"], addressee_name)
|
|
|
|
# ---- Build the MUST core ----------------------------------------------
|
|
|
|
last4 = dialogue_full[-4:] if dialogue_full else []
|
|
must_dialogue_block = _build_dialogue_block(last4, earlier_summary=None)
|
|
|
|
# MUST-tier ACTIVITIES floor: the speaker bullet alone (you and
|
|
# guest bullets are dropped first under bullet-level trim before
|
|
# the block bottoms out at speaker-only).
|
|
must_activity_block = _activity_block_for(
|
|
include_you=False, include_guest=False
|
|
)
|
|
|
|
must_blocks: list[str | None] = [
|
|
speaker_identity,
|
|
edge_to_addressee,
|
|
scene_block,
|
|
must_activity_block,
|
|
must_dialogue_block,
|
|
closing,
|
|
]
|
|
must_text = _join_blocks(must_blocks)
|
|
must_tokens = _count_tokens(must_text, encoding)
|
|
if must_tokens > budget_hard:
|
|
raise ValueError(
|
|
f"MUST-include block ({must_tokens} tokens) exceeds budget_hard "
|
|
f"({budget_hard}). Cannot assemble prompt."
|
|
)
|
|
|
|
# ---- Stage SHOULD additions, then NICE additions -----------------------
|
|
|
|
# We carry a running "components" list and rebuild the body as we go
|
|
# so token accounting reflects join-overhead. Order in the final
|
|
# prompt follows §6.3: identity → edge → other edges → scene →
|
|
# activities → previous scene summary → memories → dialogue → close.
|
|
|
|
def assemble(
|
|
*,
|
|
include_other_edges: bool,
|
|
include_previous_scene: bool,
|
|
include_memories_top_k: int,
|
|
dialogue_keep: int,
|
|
include_you_activity: bool = True,
|
|
include_guest_activity: bool = True,
|
|
include_group_node: bool = True,
|
|
) -> tuple[str, int, list[dict]]:
|
|
# dialogue: keep the last `dialogue_keep` turns verbatim; older
|
|
# turns become an "earlier:" placeholder line.
|
|
kept_dialogue = (
|
|
dialogue_full[-dialogue_keep:] if dialogue_keep > 0 else []
|
|
)
|
|
elided = max(0, len(dialogue_full) - len(kept_dialogue))
|
|
earlier_summary = (
|
|
_earlier_summary_placeholder(elided) if elided > 0 else None
|
|
)
|
|
dialogue_block = _build_dialogue_block(kept_dialogue, earlier_summary)
|
|
|
|
memories_subset = memory_summaries[:include_memories_top_k]
|
|
memories_block = _build_memories_block(memories_subset)
|
|
|
|
prev_block = (
|
|
_build_previous_scene_block(previous_scene_summary)
|
|
if include_previous_scene else None
|
|
)
|
|
|
|
# Single ACTIVITIES: block, bullet-level trim (T71.2). Guest
|
|
# bullet drops first, then the you bullet; speaker bullet is the
|
|
# MUST-tier floor and always present when an activity row exists.
|
|
activity_block = _activity_block_for(
|
|
include_you=include_you_activity,
|
|
include_guest=include_guest_activity,
|
|
)
|
|
|
|
body = _join_blocks([
|
|
speaker_identity,
|
|
edge_to_addressee,
|
|
other_edges_block if include_other_edges else None,
|
|
scene_block,
|
|
activity_block,
|
|
group_node_block if include_group_node else None,
|
|
prev_block,
|
|
memories_block,
|
|
dialogue_block,
|
|
closing,
|
|
])
|
|
return body, _count_tokens(body, encoding), kept_dialogue
|
|
|
|
# Start with the MUST baseline: last 4 turns of dialogue, no
|
|
# SHOULD/NICE extras.
|
|
baseline_keep = min(4, len(dialogue_full))
|
|
|
|
# Try the most generous configuration first; trim greedily.
|
|
nice_dialogue_keep = len(dialogue_full) # all turns, no elision
|
|
nice_memories_k = min(4, len(memory_summaries))
|
|
include_prev = previous_scene_summary is not None
|
|
include_other = other_edges_block is not None
|
|
include_you_activity = you_activity is not None
|
|
include_guest_activity = guest_activity is not None
|
|
include_group_node = group_node_block is not None
|
|
|
|
def _build(*, prev: bool, mem_k: int, dlg: int, other: bool,
|
|
you_act: bool, guest_act: bool, group: bool) -> tuple[str, int]:
|
|
body, total, _ = assemble(
|
|
include_other_edges=other,
|
|
include_previous_scene=prev,
|
|
include_memories_top_k=mem_k,
|
|
dialogue_keep=dlg,
|
|
include_you_activity=you_act,
|
|
include_guest_activity=guest_act,
|
|
include_group_node=group,
|
|
)
|
|
return body, total
|
|
|
|
body, total = _build(
|
|
prev=include_prev, mem_k=nice_memories_k, dlg=nice_dialogue_keep,
|
|
other=include_other, you_act=include_you_activity,
|
|
guest_act=include_guest_activity, group=include_group_node,
|
|
)
|
|
|
|
# If under soft, we're done.
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
# Drop NICE in order: previous scene → memories beyond top-2 →
|
|
# older dialogue turns (collapse to 4).
|
|
#
|
|
# T71.3 — order rationale: the §6.3 spec lists NICE-tier members
|
|
# with previous-scene LAST, which read as a literal trim order
|
|
# during T18 review. We deliberately keep the greedy order shown
|
|
# here (previous-scene FIRST) for two reasons:
|
|
#
|
|
# 1. Cheapest-impact-first: a per-POV previous-scene summary is
|
|
# a single short paragraph that loses very little narrative
|
|
# continuity when dropped, while the older dialogue turns it
|
|
# is competing with carry the speaker's last few beats — those
|
|
# ground the next response far more concretely.
|
|
# 2. Greedy lookahead is more expensive than the marginal
|
|
# narrative loss. Dropping previous-scene typically clears
|
|
# the soft-budget slack in one step; trying memories or
|
|
# dialogue first would routinely require multiple recompute
|
|
# passes through the assembler.
|
|
#
|
|
# The pin test test_nice_trim_order_documented locks this order so
|
|
# a future refactor can't quietly invert it without surfacing the
|
|
# decision.
|
|
if include_prev:
|
|
include_prev = False
|
|
body, total = _build(
|
|
prev=include_prev, mem_k=nice_memories_k, dlg=nice_dialogue_keep,
|
|
other=include_other, you_act=include_you_activity,
|
|
guest_act=include_guest_activity, group=include_group_node,
|
|
)
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
if nice_memories_k > 2:
|
|
nice_memories_k = 2
|
|
body, total = _build(
|
|
prev=include_prev, mem_k=nice_memories_k, dlg=nice_dialogue_keep,
|
|
other=include_other, you_act=include_you_activity,
|
|
guest_act=include_guest_activity, group=include_group_node,
|
|
)
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
if nice_dialogue_keep > baseline_keep:
|
|
nice_dialogue_keep = baseline_keep
|
|
body, total = _build(
|
|
prev=include_prev, mem_k=nice_memories_k, dlg=nice_dialogue_keep,
|
|
other=include_other, you_act=include_you_activity,
|
|
guest_act=include_guest_activity, group=include_group_node,
|
|
)
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
# Drop more NICE until we're under hard: memories all the way to 0.
|
|
while nice_memories_k > 0 and total > budget_hard:
|
|
nice_memories_k = max(0, nice_memories_k - 1)
|
|
body, total = _build(
|
|
prev=include_prev, mem_k=nice_memories_k, dlg=nice_dialogue_keep,
|
|
other=include_other, you_act=include_you_activity,
|
|
guest_act=include_guest_activity, group=include_group_node,
|
|
)
|
|
|
|
# Drop SHOULD-tier extras in order:
|
|
# 1. guest activity bullet (T71.2: bullet-level trim within the
|
|
# single ACTIVITIES: block — guest goes first per Task 43 spec)
|
|
# 2. group node block
|
|
# 3. you activity bullet (still SHOULD-tier; speaker bullet is the
|
|
# MUST-tier floor and never dropped)
|
|
# 4. other edges
|
|
if include_guest_activity and total > budget_hard:
|
|
include_guest_activity = False
|
|
body, total = _build(
|
|
prev=include_prev, mem_k=nice_memories_k, dlg=nice_dialogue_keep,
|
|
other=include_other, you_act=include_you_activity,
|
|
guest_act=include_guest_activity, group=include_group_node,
|
|
)
|
|
|
|
if include_group_node and total > budget_hard:
|
|
include_group_node = False
|
|
body, total = _build(
|
|
prev=include_prev, mem_k=nice_memories_k, dlg=nice_dialogue_keep,
|
|
other=include_other, you_act=include_you_activity,
|
|
guest_act=include_guest_activity, group=include_group_node,
|
|
)
|
|
|
|
if include_you_activity and total > budget_hard:
|
|
include_you_activity = False
|
|
body, total = _build(
|
|
prev=include_prev, mem_k=nice_memories_k, dlg=nice_dialogue_keep,
|
|
other=include_other, you_act=include_you_activity,
|
|
guest_act=include_guest_activity, group=include_group_node,
|
|
)
|
|
|
|
if include_other and total > budget_hard:
|
|
include_other = False
|
|
body, total = _build(
|
|
prev=include_prev, mem_k=nice_memories_k, dlg=nice_dialogue_keep,
|
|
other=include_other, you_act=include_you_activity,
|
|
guest_act=include_guest_activity, group=include_group_node,
|
|
)
|
|
|
|
if total > budget_hard:
|
|
# We've stripped everything optional and we still overflow.
|
|
# MUST alone fits (we checked at the top), so this means our
|
|
# last-4 dialogue + must blocks together exceed hard. Fall back
|
|
# to the bare MUST core.
|
|
body = must_text
|
|
total = must_tokens
|
|
if total > budget_hard:
|
|
raise ValueError(
|
|
f"Prompt cannot fit budget_hard={budget_hard}; MUST core "
|
|
f"is {total} tokens"
|
|
)
|
|
|
|
return _emit(body, user_turn_prose)
|
|
|
|
|
|
def _emit(system_body: str, user_turn_prose: str | None) -> list[Message]:
|
|
msgs: list[Message] = [Message(role="system", content=system_body)]
|
|
if user_turn_prose is not None:
|
|
msgs.append(Message(role="user", content=user_turn_prose))
|
|
return msgs
|
|
|
|
|
|
__all__ = ["assemble_narrative_prompt"]
|