d161e7b8e9
Bot replies were running long (4 paragraphs of action+dialogue beats per turn) because we never set max_tokens on the narrative call. Three tunable knobs now in Settings (set in data/config.toml to override): - narrative_max_tokens: int = 400 Hard cap on each generated response. ~400 tokens ≈ 1–2 short paragraphs. Drop to 200 for terse banter, bump to 800+ for longer scenes. - narrative_temperature: float = 0.85 Sampling temperature. 0.7 = grounded/consistent (slightly stiff), 0.85 = creative-but-in-character (default), 1.0 = wide variety, >1.0 = often off-the-rails. - prompt closing instruction now nudges: "Keep your response to a single beat — one or two short paragraphs at most. Don't monologue; leave room for the other person to react." Both turns.py (post_turn) and regenerate.py forward the params to client.stream(). FeatherlessClient already passes **params through to the OpenAI-compat endpoint. Note: temperature doesn't control length — that was a common misconception. max_tokens is the actual length cap. Lower temperature makes word choice more predictable (slightly stiffer voice), not shorter. Both knobs are useful for different goals.
557 lines
20 KiB
Python
557 lines
20 KiB
Python
"""Narrative-prompt assembly with must/should/nice trim tiers.
|
|
|
|
Implements Task 18 (Phase 1D). See Requirements §3.2 (token budgets and
|
|
trim tiers) and §6.3 (speaker prompt assembly order). The function
|
|
:func:`assemble_narrative_prompt` returns a list of
|
|
:class:`chat.llm.client.Message` objects ready to feed to
|
|
``LLMClient.generate``.
|
|
|
|
Trim policy when the assembled prompt exceeds the soft target:
|
|
|
|
- **MUST-include** (never trimmed): system / speaker identity, the
|
|
speaker→addressee edge, the activity snapshot for all present
|
|
entities, the current scene description, and the last 4 turns of
|
|
dialogue.
|
|
- **SHOULD-include** (trim when over budget): other edges of the
|
|
speaker. (Group nodes, active threads, and active events / props are
|
|
Phase 3 — skipped here.)
|
|
- **NICE-include** (trim first): retrieved memories beyond top-2,
|
|
dialogue turns beyond the last 4 (replaced with a one-line elision
|
|
placeholder), per-POV summary of the previous scene.
|
|
|
|
Token counting uses ``tiktoken.get_encoding("cl100k_base")`` per the
|
|
requirements. Mistral / Llama tokenizers diverge ~5%; we accept the
|
|
drift.
|
|
|
|
The function is intentionally deterministic (no LLM call) so it is
|
|
testable with synthetic state and so T29's regenerate flow can rebuild
|
|
prompts without re-running classifiers.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from sqlite3 import Connection
|
|
|
|
import tiktoken
|
|
|
|
from chat.llm.client import Message
|
|
from chat.state.edges import get_edge, list_edges_for
|
|
from chat.state.entities import get_bot, get_you
|
|
from chat.state.memory import search_memories
|
|
from chat.state.world import (
|
|
active_scene,
|
|
get_activity,
|
|
get_chat,
|
|
get_container,
|
|
get_scene,
|
|
)
|
|
|
|
|
|
# Cache the encoder once at import-time. tiktoken's encoder load is
|
|
# non-trivial (~tens of ms) and the encoding is process-wide stable.
|
|
_ENCODER = tiktoken.get_encoding("cl100k_base")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _count_tokens(text: str, encoding=_ENCODER) -> int:
|
|
"""Return the cl100k_base token count for ``text`` (0 for falsy)."""
|
|
if not text:
|
|
return 0
|
|
return len(encoding.encode(text))
|
|
|
|
|
|
def _build_speaker_identity(bot: dict) -> str:
|
|
"""Render the bot identity block. Skips empty optional fields."""
|
|
lines = [f"You are {bot['name']}."]
|
|
if bot.get("persona"):
|
|
lines.append("")
|
|
lines.append("PERSONA:")
|
|
lines.append(bot["persona"])
|
|
voice_samples = bot.get("voice_samples") or []
|
|
if voice_samples:
|
|
lines.append("")
|
|
lines.append("VOICE REFERENCE:")
|
|
lines.append("\n---\n".join(voice_samples))
|
|
traits = bot.get("traits") or []
|
|
if traits:
|
|
lines.append("")
|
|
lines.append(f"TRAITS: {', '.join(traits)}")
|
|
if bot.get("backstory"):
|
|
lines.append("")
|
|
lines.append("BACKSTORY:")
|
|
lines.append(bot["backstory"])
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_edge_block(edge: dict | None, addressee_name: str) -> str | None:
|
|
"""Render the speaker → addressee edge. Returns None when no edge exists."""
|
|
if edge is None:
|
|
return None
|
|
lines = [f"YOUR EDGE TO {addressee_name}:"]
|
|
lines.append(f"- Affinity: {edge.get('affinity', 50)}/100")
|
|
lines.append(f"- Trust: {edge.get('trust', 50)}/100")
|
|
summary = edge.get("summary") or ""
|
|
if summary:
|
|
lines.append(f"- Summary: {summary}")
|
|
knowledge = edge.get("knowledge") or []
|
|
if knowledge:
|
|
lines.append(f"- What you know about {addressee_name}:")
|
|
for fact in knowledge:
|
|
lines.append(f" * {fact}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_activity_block(activities: list[dict]) -> str | None:
|
|
"""Render the activity snapshot for all present entities."""
|
|
rendered: list[str] = []
|
|
for a in activities:
|
|
if a is None:
|
|
continue
|
|
label = a.get("_display_name") or a.get("entity_id", "?")
|
|
parts: list[str] = []
|
|
posture = a.get("posture") or ""
|
|
if posture:
|
|
parts.append(posture)
|
|
action = a.get("action") or {}
|
|
verb = action.get("verb") if isinstance(action, dict) else None
|
|
if verb:
|
|
parts.append(verb)
|
|
attention = a.get("attention") or ""
|
|
if attention:
|
|
parts.append(f"attention: {attention}")
|
|
holding = a.get("holding") or []
|
|
if holding:
|
|
parts.append(f"holding: {', '.join(holding)}")
|
|
if parts:
|
|
rendered.append(f"- {label}: " + ", ".join(parts))
|
|
else:
|
|
rendered.append(f"- {label}: (no activity recorded)")
|
|
if not rendered:
|
|
return None
|
|
return "ACTIVITIES:\n" + "\n".join(rendered)
|
|
|
|
|
|
def _build_scene_block(chat: dict, container: dict | None, scene: dict | None) -> str | None:
|
|
"""Render the current-scene block. Always present when chat exists."""
|
|
lines = ["CURRENT SCENE:"]
|
|
if container is not None:
|
|
lines.append(f"- Container: {container['name']} ({container['type']})")
|
|
chat_time = chat.get("time") if chat else None
|
|
if chat_time:
|
|
lines.append(f"- Time: {chat_time}")
|
|
if scene is not None and scene.get("started_at"):
|
|
lines.append(f"- Active scene started: {scene['started_at']}")
|
|
if len(lines) == 1:
|
|
return None
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _format_dialogue_turn(turn: dict) -> str:
|
|
speaker = turn.get("speaker") or "?"
|
|
text = turn.get("text") or ""
|
|
return f"{speaker}: {text}"
|
|
|
|
|
|
def _build_dialogue_block(
|
|
recent: list[dict],
|
|
earlier_summary: str | None,
|
|
) -> str | None:
|
|
"""Render the recent-dialogue block. The ``recent`` list is the
|
|
*kept* tail of the dialogue (already trimmed to the last-N turns).
|
|
``earlier_summary``, when non-None, is rendered as the first line as
|
|
``earlier: <text>`` to flag elided context.
|
|
"""
|
|
if not recent and not earlier_summary:
|
|
return None
|
|
lines = ["RECENT DIALOGUE:"]
|
|
if earlier_summary:
|
|
lines.append(f"earlier: {earlier_summary}")
|
|
for turn in recent:
|
|
lines.append(_format_dialogue_turn(turn))
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_memories_block(memory_summaries: list[str]) -> str | None:
|
|
if not memory_summaries:
|
|
return None
|
|
lines = ["RELEVANT MEMORIES:"]
|
|
for m in memory_summaries:
|
|
lines.append(f"- {m}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_other_edges_block(edges: list[dict]) -> str | None:
|
|
"""Render edges to entities other than the addressee."""
|
|
if not edges:
|
|
return None
|
|
lines = ["OTHER EDGES:"]
|
|
for e in edges:
|
|
target = e.get("_display_name") or e.get("target_id", "?")
|
|
affinity = e.get("affinity", 50)
|
|
trust = e.get("trust", 50)
|
|
lines.append(f"- {target}: affinity {affinity}/100, trust {trust}/100")
|
|
summary = e.get("summary") or ""
|
|
if summary:
|
|
lines.append(f" summary: {summary}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_previous_scene_block(pov_summary: str | None) -> str | None:
|
|
if not pov_summary:
|
|
return None
|
|
return "PREVIOUS SCENE SUMMARY:\n" + pov_summary
|
|
|
|
|
|
def _closing_instruction(speaker_name: str, addressee_name: str) -> str:
|
|
return (
|
|
f"Continue the scene as {speaker_name}, in their voice, responding "
|
|
"naturally. Use *asterisks* for actions and quotes for dialogue. "
|
|
f"Stay in character. Do not narrate {addressee_name}'s actions or "
|
|
"thoughts. "
|
|
"Keep your response to a single beat — one or two short paragraphs "
|
|
"at most. Don't monologue; leave room for the other person to react."
|
|
)
|
|
|
|
|
|
def _join_blocks(blocks: list[str | None]) -> str:
|
|
"""Join non-empty blocks with double newlines."""
|
|
return "\n\n".join(b for b in blocks if b)
|
|
|
|
|
|
def _earlier_summary_placeholder(elided_count: int) -> str:
|
|
"""Phase 1 placeholder. Real summarization is a downstream concern."""
|
|
plural = "turn" if elided_count == 1 else "turns"
|
|
return f"{elided_count} earlier {plural} elided for brevity"
|
|
|
|
|
|
def _resolve_previous_scene_summary(
|
|
conn: Connection, chat_id: str, speaker_bot_id: str
|
|
) -> str | None:
|
|
"""Return ``pov_summary`` of the most recent ended scene, owned by
|
|
the speaker. None if no closed scene exists or no matching memory.
|
|
"""
|
|
row = conn.execute(
|
|
"SELECT id FROM scenes WHERE chat_id = ? AND ended_at IS NOT NULL "
|
|
"ORDER BY ended_at DESC LIMIT 1",
|
|
(chat_id,),
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
scene_id = row[0]
|
|
mem = conn.execute(
|
|
"SELECT pov_summary FROM memories WHERE scene_id = ? AND owner_id = ? "
|
|
"ORDER BY id DESC LIMIT 1",
|
|
(scene_id, speaker_bot_id),
|
|
).fetchone()
|
|
if not mem:
|
|
return None
|
|
return mem[0]
|
|
|
|
|
|
def _resolve_addressee(
|
|
conn: Connection, addressee: str, you: dict | None
|
|
) -> tuple[str, str]:
|
|
"""Return ``(addressee_id, addressee_display_name)``.
|
|
|
|
The function is permissive: ``addressee="you"`` resolves to the
|
|
you-entity (display name is its authored name, falling back to
|
|
"you" if no entity exists yet). Other ids resolve as bot ids.
|
|
"""
|
|
if addressee == "you":
|
|
name = (you or {}).get("name") or "you"
|
|
return "you", name
|
|
bot = get_bot(conn, addressee)
|
|
if bot is not None:
|
|
return addressee, bot["name"]
|
|
return addressee, addressee
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def assemble_narrative_prompt(
|
|
conn: Connection,
|
|
*,
|
|
chat_id: str,
|
|
speaker_bot_id: str,
|
|
addressee: str = "you",
|
|
user_turn_prose: str | None = None,
|
|
recent_dialogue: list[dict] | None = None,
|
|
retrieved_memory_summaries: list[str] | None = None,
|
|
budget_soft: int = 6000,
|
|
budget_hard: int = 8000,
|
|
encoding_name: str = "cl100k_base",
|
|
) -> list[Message]:
|
|
"""Assemble the narrative prompt for ``speaker_bot_id`` to respond.
|
|
|
|
Returns a list of :class:`Message` objects: one ``system`` message
|
|
carrying the assembled context, optionally followed by a single
|
|
``user`` message containing ``user_turn_prose`` (when provided).
|
|
|
|
Trimming proceeds in tiers (NICE → SHOULD) once the total token
|
|
count exceeds ``budget_soft``; the function refuses to exceed
|
|
``budget_hard``. If the MUST-include block alone is already over
|
|
``budget_hard``, :class:`ValueError` is raised — the caller should
|
|
surface the failure rather than ship a malformed prompt.
|
|
"""
|
|
encoding = (
|
|
_ENCODER if encoding_name == "cl100k_base"
|
|
else tiktoken.get_encoding(encoding_name)
|
|
)
|
|
|
|
bot = get_bot(conn, speaker_bot_id)
|
|
if bot is None:
|
|
raise ValueError(f"speaker_bot_id {speaker_bot_id!r} not found")
|
|
|
|
chat = get_chat(conn, chat_id)
|
|
if chat is None:
|
|
raise ValueError(f"chat_id {chat_id!r} not found")
|
|
|
|
you = get_you(conn)
|
|
addressee_id, addressee_name = _resolve_addressee(conn, addressee, you)
|
|
|
|
# ---- Build all components as text strings ------------------------------
|
|
|
|
speaker_identity = _build_speaker_identity(bot)
|
|
|
|
edge_to_addressee = _build_edge_block(
|
|
get_edge(conn, speaker_bot_id, addressee_id),
|
|
addressee_name,
|
|
)
|
|
|
|
# Activity for present entities. Phase 1: you + speaker bot. (When a
|
|
# guest is added in Phase 1+, callers that know about it can pass
|
|
# extra activities via a future hook; for now we keep it strict.)
|
|
activities: list[dict] = []
|
|
you_act = get_activity(conn, "you")
|
|
if you_act is not None:
|
|
you_act = dict(you_act)
|
|
you_act["_display_name"] = (you or {}).get("name") or "you"
|
|
activities.append(you_act)
|
|
bot_act = get_activity(conn, speaker_bot_id)
|
|
if bot_act is not None:
|
|
bot_act = dict(bot_act)
|
|
bot_act["_display_name"] = bot["name"]
|
|
activities.append(bot_act)
|
|
activity_block = _build_activity_block(activities)
|
|
|
|
container = None
|
|
if chat.get("active_scene_id"):
|
|
scene = get_scene(conn, chat["active_scene_id"])
|
|
if scene and scene.get("container_id"):
|
|
container = get_container(conn, scene["container_id"])
|
|
else:
|
|
scene = active_scene(conn, chat_id)
|
|
if container is None and scene and scene.get("container_id"):
|
|
container = get_container(conn, scene["container_id"])
|
|
scene_block = _build_scene_block(chat, container, scene)
|
|
|
|
# Other edges: speaker → non-addressee.
|
|
all_outgoing = list_edges_for(conn, speaker_bot_id)
|
|
other_edges_raw = [e for e in all_outgoing if e.get("target_id") != addressee_id]
|
|
for e in other_edges_raw:
|
|
tid = e.get("target_id")
|
|
if tid == "you":
|
|
e["_display_name"] = (you or {}).get("name") or "you"
|
|
else:
|
|
tb = get_bot(conn, tid) if tid else None
|
|
e["_display_name"] = tb["name"] if tb else (tid or "?")
|
|
other_edges_block = _build_other_edges_block(other_edges_raw)
|
|
|
|
# Memories: caller override wins; otherwise FTS5 search keyed on the
|
|
# scene's container/posture as a coarse query proxy.
|
|
if retrieved_memory_summaries is not None:
|
|
memory_summaries = list(retrieved_memory_summaries)
|
|
else:
|
|
query = (container or {}).get("name") or chat.get("narrative_anchor") or ""
|
|
memory_summaries = []
|
|
if query:
|
|
try:
|
|
hits = search_memories(conn, speaker_bot_id, "host", query, k=4)
|
|
memory_summaries = [h["pov_summary"] for h in hits]
|
|
except Exception:
|
|
memory_summaries = []
|
|
|
|
# Dialogue: caller override only (no event_log read in Phase 1).
|
|
dialogue_full = list(recent_dialogue or [])
|
|
|
|
previous_scene_summary = _resolve_previous_scene_summary(
|
|
conn, chat_id, speaker_bot_id
|
|
)
|
|
|
|
closing = _closing_instruction(bot["name"], addressee_name)
|
|
|
|
# ---- Build the MUST core ----------------------------------------------
|
|
|
|
last4 = dialogue_full[-4:] if dialogue_full else []
|
|
must_dialogue_block = _build_dialogue_block(last4, earlier_summary=None)
|
|
|
|
must_blocks: list[str | None] = [
|
|
speaker_identity,
|
|
edge_to_addressee,
|
|
scene_block,
|
|
activity_block,
|
|
must_dialogue_block,
|
|
closing,
|
|
]
|
|
must_text = _join_blocks(must_blocks)
|
|
must_tokens = _count_tokens(must_text, encoding)
|
|
if must_tokens > budget_hard:
|
|
raise ValueError(
|
|
f"MUST-include block ({must_tokens} tokens) exceeds budget_hard "
|
|
f"({budget_hard}). Cannot assemble prompt."
|
|
)
|
|
|
|
# ---- Stage SHOULD additions, then NICE additions -----------------------
|
|
|
|
# We carry a running "components" list and rebuild the body as we go
|
|
# so token accounting reflects join-overhead. Order in the final
|
|
# prompt follows §6.3: identity → edge → other edges → scene →
|
|
# activities → previous scene summary → memories → dialogue → close.
|
|
|
|
def assemble(
|
|
*,
|
|
include_other_edges: bool,
|
|
include_previous_scene: bool,
|
|
include_memories_top_k: int,
|
|
dialogue_keep: int,
|
|
) -> tuple[str, int, list[dict]]:
|
|
# dialogue: keep the last `dialogue_keep` turns verbatim; older
|
|
# turns become an "earlier:" placeholder line.
|
|
kept_dialogue = (
|
|
dialogue_full[-dialogue_keep:] if dialogue_keep > 0 else []
|
|
)
|
|
elided = max(0, len(dialogue_full) - len(kept_dialogue))
|
|
earlier_summary = (
|
|
_earlier_summary_placeholder(elided) if elided > 0 else None
|
|
)
|
|
dialogue_block = _build_dialogue_block(kept_dialogue, earlier_summary)
|
|
|
|
memories_subset = memory_summaries[:include_memories_top_k]
|
|
memories_block = _build_memories_block(memories_subset)
|
|
|
|
prev_block = (
|
|
_build_previous_scene_block(previous_scene_summary)
|
|
if include_previous_scene else None
|
|
)
|
|
|
|
body = _join_blocks([
|
|
speaker_identity,
|
|
edge_to_addressee,
|
|
other_edges_block if include_other_edges else None,
|
|
scene_block,
|
|
activity_block,
|
|
prev_block,
|
|
memories_block,
|
|
dialogue_block,
|
|
closing,
|
|
])
|
|
return body, _count_tokens(body, encoding), kept_dialogue
|
|
|
|
# Start with the MUST baseline: last 4 turns of dialogue, no
|
|
# SHOULD/NICE extras.
|
|
baseline_keep = min(4, len(dialogue_full))
|
|
|
|
# Try the most generous configuration first; trim greedily.
|
|
nice_dialogue_keep = len(dialogue_full) # all turns, no elision
|
|
nice_memories_k = min(4, len(memory_summaries))
|
|
include_prev = previous_scene_summary is not None
|
|
include_other = other_edges_block is not None
|
|
|
|
body, total, _ = assemble(
|
|
include_other_edges=include_other,
|
|
include_previous_scene=include_prev,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
|
|
# If under soft, we're done.
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
# Drop NICE in order: previous scene → memories beyond top-2 →
|
|
# older dialogue turns (collapse to 4).
|
|
if include_prev:
|
|
body, total, _ = assemble(
|
|
include_other_edges=include_other,
|
|
include_previous_scene=False,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
include_prev = False
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
if nice_memories_k > 2:
|
|
nice_memories_k = 2
|
|
body, total, _ = assemble(
|
|
include_other_edges=include_other,
|
|
include_previous_scene=False,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
if nice_dialogue_keep > baseline_keep:
|
|
nice_dialogue_keep = baseline_keep
|
|
body, total, _ = assemble(
|
|
include_other_edges=include_other,
|
|
include_previous_scene=False,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
if total <= budget_soft:
|
|
return _emit(body, user_turn_prose)
|
|
|
|
# Drop more NICE until we're under hard: memories all the way to 0.
|
|
while nice_memories_k > 0 and total > budget_hard:
|
|
nice_memories_k = max(0, nice_memories_k - 1)
|
|
body, total, _ = assemble(
|
|
include_other_edges=include_other,
|
|
include_previous_scene=False,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
|
|
# Drop SHOULD: other edges.
|
|
if include_other and total > budget_hard:
|
|
include_other = False
|
|
body, total, _ = assemble(
|
|
include_other_edges=False,
|
|
include_previous_scene=False,
|
|
include_memories_top_k=nice_memories_k,
|
|
dialogue_keep=nice_dialogue_keep,
|
|
)
|
|
|
|
if total > budget_hard:
|
|
# We've stripped everything optional and we still overflow.
|
|
# MUST alone fits (we checked at the top), so this means our
|
|
# last-4 dialogue + must blocks together exceed hard. Fall back
|
|
# to the bare MUST core.
|
|
body = must_text
|
|
total = must_tokens
|
|
if total > budget_hard:
|
|
raise ValueError(
|
|
f"Prompt cannot fit budget_hard={budget_hard}; MUST core "
|
|
f"is {total} tokens"
|
|
)
|
|
|
|
return _emit(body, user_turn_prose)
|
|
|
|
|
|
def _emit(system_body: str, user_turn_prose: str | None) -> list[Message]:
|
|
msgs: list[Message] = [Message(role="system", content=system_body)]
|
|
if user_turn_prose is not None:
|
|
msgs.append(Message(role="user", content=user_turn_prose))
|
|
return msgs
|
|
|
|
|
|
__all__ = ["assemble_narrative_prompt"]
|