diff --git a/chat/services/prompt.py b/chat/services/prompt.py new file mode 100644 index 0000000..89b416e --- /dev/null +++ b/chat/services/prompt.py @@ -0,0 +1,554 @@ +"""Narrative-prompt assembly with must/should/nice trim tiers. + +Implements Task 18 (Phase 1D). See Requirements §3.2 (token budgets and +trim tiers) and §6.3 (speaker prompt assembly order). The function +:func:`assemble_narrative_prompt` returns a list of +:class:`chat.llm.client.Message` objects ready to feed to +``LLMClient.generate``. + +Trim policy when the assembled prompt exceeds the soft target: + +- **MUST-include** (never trimmed): system / speaker identity, the + speaker→addressee edge, the activity snapshot for all present + entities, the current scene description, and the last 4 turns of + dialogue. +- **SHOULD-include** (trim when over budget): other edges of the + speaker. (Group nodes, active threads, and active events / props are + Phase 3 — skipped here.) +- **NICE-include** (trim first): retrieved memories beyond top-2, + dialogue turns beyond the last 4 (replaced with a one-line elision + placeholder), per-POV summary of the previous scene. + +Token counting uses ``tiktoken.get_encoding("cl100k_base")`` per the +requirements. Mistral / Llama tokenizers diverge ~5%; we accept the +drift. + +The function is intentionally deterministic (no LLM call) so it is +testable with synthetic state and so T29's regenerate flow can rebuild +prompts without re-running classifiers. +""" + +from __future__ import annotations + +from sqlite3 import Connection + +import tiktoken + +from chat.llm.client import Message +from chat.state.edges import get_edge, list_edges_for +from chat.state.entities import get_bot, get_you +from chat.state.memory import search_memories +from chat.state.world import ( + active_scene, + get_activity, + get_chat, + get_container, + get_scene, +) + + +# Cache the encoder once at import-time. tiktoken's encoder load is +# non-trivial (~tens of ms) and the encoding is process-wide stable. +_ENCODER = tiktoken.get_encoding("cl100k_base") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _count_tokens(text: str, encoding=_ENCODER) -> int: + """Return the cl100k_base token count for ``text`` (0 for falsy).""" + if not text: + return 0 + return len(encoding.encode(text)) + + +def _build_speaker_identity(bot: dict) -> str: + """Render the bot identity block. Skips empty optional fields.""" + lines = [f"You are {bot['name']}."] + if bot.get("persona"): + lines.append("") + lines.append("PERSONA:") + lines.append(bot["persona"]) + voice_samples = bot.get("voice_samples") or [] + if voice_samples: + lines.append("") + lines.append("VOICE REFERENCE:") + lines.append("\n---\n".join(voice_samples)) + traits = bot.get("traits") or [] + if traits: + lines.append("") + lines.append(f"TRAITS: {', '.join(traits)}") + if bot.get("backstory"): + lines.append("") + lines.append("BACKSTORY:") + lines.append(bot["backstory"]) + return "\n".join(lines) + + +def _build_edge_block(edge: dict | None, addressee_name: str) -> str | None: + """Render the speaker → addressee edge. Returns None when no edge exists.""" + if edge is None: + return None + lines = [f"YOUR EDGE TO {addressee_name}:"] + lines.append(f"- Affinity: {edge.get('affinity', 50)}/100") + lines.append(f"- Trust: {edge.get('trust', 50)}/100") + summary = edge.get("summary") or "" + if summary: + lines.append(f"- Summary: {summary}") + knowledge = edge.get("knowledge") or [] + if knowledge: + lines.append(f"- What you know about {addressee_name}:") + for fact in knowledge: + lines.append(f" * {fact}") + return "\n".join(lines) + + +def _build_activity_block(activities: list[dict]) -> str | None: + """Render the activity snapshot for all present entities.""" + rendered: list[str] = [] + for a in activities: + if a is None: + continue + label = a.get("_display_name") or a.get("entity_id", "?") + parts: list[str] = [] + posture = a.get("posture") or "" + if posture: + parts.append(posture) + action = a.get("action") or {} + verb = action.get("verb") if isinstance(action, dict) else None + if verb: + parts.append(verb) + attention = a.get("attention") or "" + if attention: + parts.append(f"attention: {attention}") + holding = a.get("holding") or [] + if holding: + parts.append(f"holding: {', '.join(holding)}") + if parts: + rendered.append(f"- {label}: " + ", ".join(parts)) + else: + rendered.append(f"- {label}: (no activity recorded)") + if not rendered: + return None + return "ACTIVITIES:\n" + "\n".join(rendered) + + +def _build_scene_block(chat: dict, container: dict | None, scene: dict | None) -> str | None: + """Render the current-scene block. Always present when chat exists.""" + lines = ["CURRENT SCENE:"] + if container is not None: + lines.append(f"- Container: {container['name']} ({container['type']})") + chat_time = chat.get("time") if chat else None + if chat_time: + lines.append(f"- Time: {chat_time}") + if scene is not None and scene.get("started_at"): + lines.append(f"- Active scene started: {scene['started_at']}") + if len(lines) == 1: + return None + return "\n".join(lines) + + +def _format_dialogue_turn(turn: dict) -> str: + speaker = turn.get("speaker") or "?" + text = turn.get("text") or "" + return f"{speaker}: {text}" + + +def _build_dialogue_block( + recent: list[dict], + earlier_summary: str | None, +) -> str | None: + """Render the recent-dialogue block. The ``recent`` list is the + *kept* tail of the dialogue (already trimmed to the last-N turns). + ``earlier_summary``, when non-None, is rendered as the first line as + ``earlier: `` to flag elided context. + """ + if not recent and not earlier_summary: + return None + lines = ["RECENT DIALOGUE:"] + if earlier_summary: + lines.append(f"earlier: {earlier_summary}") + for turn in recent: + lines.append(_format_dialogue_turn(turn)) + return "\n".join(lines) + + +def _build_memories_block(memory_summaries: list[str]) -> str | None: + if not memory_summaries: + return None + lines = ["RELEVANT MEMORIES:"] + for m in memory_summaries: + lines.append(f"- {m}") + return "\n".join(lines) + + +def _build_other_edges_block(edges: list[dict]) -> str | None: + """Render edges to entities other than the addressee.""" + if not edges: + return None + lines = ["OTHER EDGES:"] + for e in edges: + target = e.get("_display_name") or e.get("target_id", "?") + affinity = e.get("affinity", 50) + trust = e.get("trust", 50) + lines.append(f"- {target}: affinity {affinity}/100, trust {trust}/100") + summary = e.get("summary") or "" + if summary: + lines.append(f" summary: {summary}") + return "\n".join(lines) + + +def _build_previous_scene_block(pov_summary: str | None) -> str | None: + if not pov_summary: + return None + return "PREVIOUS SCENE SUMMARY:\n" + pov_summary + + +def _closing_instruction(speaker_name: str, addressee_name: str) -> str: + return ( + f"Continue the scene as {speaker_name}, in their voice, responding " + "naturally. Use *asterisks* for actions and quotes for dialogue. " + f"Stay in character. Do not narrate {addressee_name}'s actions or " + "thoughts." + ) + + +def _join_blocks(blocks: list[str | None]) -> str: + """Join non-empty blocks with double newlines.""" + return "\n\n".join(b for b in blocks if b) + + +def _earlier_summary_placeholder(elided_count: int) -> str: + """Phase 1 placeholder. Real summarization is a downstream concern.""" + plural = "turn" if elided_count == 1 else "turns" + return f"{elided_count} earlier {plural} elided for brevity" + + +def _resolve_previous_scene_summary( + conn: Connection, chat_id: str, speaker_bot_id: str +) -> str | None: + """Return ``pov_summary`` of the most recent ended scene, owned by + the speaker. None if no closed scene exists or no matching memory. + """ + row = conn.execute( + "SELECT id FROM scenes WHERE chat_id = ? AND ended_at IS NOT NULL " + "ORDER BY ended_at DESC LIMIT 1", + (chat_id,), + ).fetchone() + if not row: + return None + scene_id = row[0] + mem = conn.execute( + "SELECT pov_summary FROM memories WHERE scene_id = ? AND owner_id = ? " + "ORDER BY id DESC LIMIT 1", + (scene_id, speaker_bot_id), + ).fetchone() + if not mem: + return None + return mem[0] + + +def _resolve_addressee( + conn: Connection, addressee: str, you: dict | None +) -> tuple[str, str]: + """Return ``(addressee_id, addressee_display_name)``. + + The function is permissive: ``addressee="you"`` resolves to the + you-entity (display name is its authored name, falling back to + "you" if no entity exists yet). Other ids resolve as bot ids. + """ + if addressee == "you": + name = (you or {}).get("name") or "you" + return "you", name + bot = get_bot(conn, addressee) + if bot is not None: + return addressee, bot["name"] + return addressee, addressee + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def assemble_narrative_prompt( + conn: Connection, + *, + chat_id: str, + speaker_bot_id: str, + addressee: str = "you", + user_turn_prose: str | None = None, + recent_dialogue: list[dict] | None = None, + retrieved_memory_summaries: list[str] | None = None, + budget_soft: int = 6000, + budget_hard: int = 8000, + encoding_name: str = "cl100k_base", +) -> list[Message]: + """Assemble the narrative prompt for ``speaker_bot_id`` to respond. + + Returns a list of :class:`Message` objects: one ``system`` message + carrying the assembled context, optionally followed by a single + ``user`` message containing ``user_turn_prose`` (when provided). + + Trimming proceeds in tiers (NICE → SHOULD) once the total token + count exceeds ``budget_soft``; the function refuses to exceed + ``budget_hard``. If the MUST-include block alone is already over + ``budget_hard``, :class:`ValueError` is raised — the caller should + surface the failure rather than ship a malformed prompt. + """ + encoding = ( + _ENCODER if encoding_name == "cl100k_base" + else tiktoken.get_encoding(encoding_name) + ) + + bot = get_bot(conn, speaker_bot_id) + if bot is None: + raise ValueError(f"speaker_bot_id {speaker_bot_id!r} not found") + + chat = get_chat(conn, chat_id) + if chat is None: + raise ValueError(f"chat_id {chat_id!r} not found") + + you = get_you(conn) + addressee_id, addressee_name = _resolve_addressee(conn, addressee, you) + + # ---- Build all components as text strings ------------------------------ + + speaker_identity = _build_speaker_identity(bot) + + edge_to_addressee = _build_edge_block( + get_edge(conn, speaker_bot_id, addressee_id), + addressee_name, + ) + + # Activity for present entities. Phase 1: you + speaker bot. (When a + # guest is added in Phase 1+, callers that know about it can pass + # extra activities via a future hook; for now we keep it strict.) + activities: list[dict] = [] + you_act = get_activity(conn, "you") + if you_act is not None: + you_act = dict(you_act) + you_act["_display_name"] = (you or {}).get("name") or "you" + activities.append(you_act) + bot_act = get_activity(conn, speaker_bot_id) + if bot_act is not None: + bot_act = dict(bot_act) + bot_act["_display_name"] = bot["name"] + activities.append(bot_act) + activity_block = _build_activity_block(activities) + + container = None + if chat.get("active_scene_id"): + scene = get_scene(conn, chat["active_scene_id"]) + if scene and scene.get("container_id"): + container = get_container(conn, scene["container_id"]) + else: + scene = active_scene(conn, chat_id) + if container is None and scene and scene.get("container_id"): + container = get_container(conn, scene["container_id"]) + scene_block = _build_scene_block(chat, container, scene) + + # Other edges: speaker → non-addressee. + all_outgoing = list_edges_for(conn, speaker_bot_id) + other_edges_raw = [e for e in all_outgoing if e.get("target_id") != addressee_id] + for e in other_edges_raw: + tid = e.get("target_id") + if tid == "you": + e["_display_name"] = (you or {}).get("name") or "you" + else: + tb = get_bot(conn, tid) if tid else None + e["_display_name"] = tb["name"] if tb else (tid or "?") + other_edges_block = _build_other_edges_block(other_edges_raw) + + # Memories: caller override wins; otherwise FTS5 search keyed on the + # scene's container/posture as a coarse query proxy. + if retrieved_memory_summaries is not None: + memory_summaries = list(retrieved_memory_summaries) + else: + query = (container or {}).get("name") or chat.get("narrative_anchor") or "" + memory_summaries = [] + if query: + try: + hits = search_memories(conn, speaker_bot_id, "host", query, k=4) + memory_summaries = [h["pov_summary"] for h in hits] + except Exception: + memory_summaries = [] + + # Dialogue: caller override only (no event_log read in Phase 1). + dialogue_full = list(recent_dialogue or []) + + previous_scene_summary = _resolve_previous_scene_summary( + conn, chat_id, speaker_bot_id + ) + + closing = _closing_instruction(bot["name"], addressee_name) + + # ---- Build the MUST core ---------------------------------------------- + + last4 = dialogue_full[-4:] if dialogue_full else [] + must_dialogue_block = _build_dialogue_block(last4, earlier_summary=None) + + must_blocks: list[str | None] = [ + speaker_identity, + edge_to_addressee, + scene_block, + activity_block, + must_dialogue_block, + closing, + ] + must_text = _join_blocks(must_blocks) + must_tokens = _count_tokens(must_text, encoding) + if must_tokens > budget_hard: + raise ValueError( + f"MUST-include block ({must_tokens} tokens) exceeds budget_hard " + f"({budget_hard}). Cannot assemble prompt." + ) + + # ---- Stage SHOULD additions, then NICE additions ----------------------- + + # We carry a running "components" list and rebuild the body as we go + # so token accounting reflects join-overhead. Order in the final + # prompt follows §6.3: identity → edge → other edges → scene → + # activities → previous scene summary → memories → dialogue → close. + + def assemble( + *, + include_other_edges: bool, + include_previous_scene: bool, + include_memories_top_k: int, + dialogue_keep: int, + ) -> tuple[str, int, list[dict]]: + # dialogue: keep the last `dialogue_keep` turns verbatim; older + # turns become an "earlier:" placeholder line. + kept_dialogue = ( + dialogue_full[-dialogue_keep:] if dialogue_keep > 0 else [] + ) + elided = max(0, len(dialogue_full) - len(kept_dialogue)) + earlier_summary = ( + _earlier_summary_placeholder(elided) if elided > 0 else None + ) + dialogue_block = _build_dialogue_block(kept_dialogue, earlier_summary) + + memories_subset = memory_summaries[:include_memories_top_k] + memories_block = _build_memories_block(memories_subset) + + prev_block = ( + _build_previous_scene_block(previous_scene_summary) + if include_previous_scene else None + ) + + body = _join_blocks([ + speaker_identity, + edge_to_addressee, + other_edges_block if include_other_edges else None, + scene_block, + activity_block, + prev_block, + memories_block, + dialogue_block, + closing, + ]) + return body, _count_tokens(body, encoding), kept_dialogue + + # Start with the MUST baseline: last 4 turns of dialogue, no + # SHOULD/NICE extras. + baseline_keep = min(4, len(dialogue_full)) + + # Try the most generous configuration first; trim greedily. + nice_dialogue_keep = len(dialogue_full) # all turns, no elision + nice_memories_k = min(4, len(memory_summaries)) + include_prev = previous_scene_summary is not None + include_other = other_edges_block is not None + + body, total, _ = assemble( + include_other_edges=include_other, + include_previous_scene=include_prev, + include_memories_top_k=nice_memories_k, + dialogue_keep=nice_dialogue_keep, + ) + + # If under soft, we're done. + if total <= budget_soft: + return _emit(body, user_turn_prose) + + # Drop NICE in order: previous scene → memories beyond top-2 → + # older dialogue turns (collapse to 4). + if include_prev: + body, total, _ = assemble( + include_other_edges=include_other, + include_previous_scene=False, + include_memories_top_k=nice_memories_k, + dialogue_keep=nice_dialogue_keep, + ) + include_prev = False + if total <= budget_soft: + return _emit(body, user_turn_prose) + + if nice_memories_k > 2: + nice_memories_k = 2 + body, total, _ = assemble( + include_other_edges=include_other, + include_previous_scene=False, + include_memories_top_k=nice_memories_k, + dialogue_keep=nice_dialogue_keep, + ) + if total <= budget_soft: + return _emit(body, user_turn_prose) + + if nice_dialogue_keep > baseline_keep: + nice_dialogue_keep = baseline_keep + body, total, _ = assemble( + include_other_edges=include_other, + include_previous_scene=False, + include_memories_top_k=nice_memories_k, + dialogue_keep=nice_dialogue_keep, + ) + if total <= budget_soft: + return _emit(body, user_turn_prose) + + # Drop more NICE until we're under hard: memories all the way to 0. + while nice_memories_k > 0 and total > budget_hard: + nice_memories_k = max(0, nice_memories_k - 1) + body, total, _ = assemble( + include_other_edges=include_other, + include_previous_scene=False, + include_memories_top_k=nice_memories_k, + dialogue_keep=nice_dialogue_keep, + ) + + # Drop SHOULD: other edges. + if include_other and total > budget_hard: + include_other = False + body, total, _ = assemble( + include_other_edges=False, + include_previous_scene=False, + include_memories_top_k=nice_memories_k, + dialogue_keep=nice_dialogue_keep, + ) + + if total > budget_hard: + # We've stripped everything optional and we still overflow. + # MUST alone fits (we checked at the top), so this means our + # last-4 dialogue + must blocks together exceed hard. Fall back + # to the bare MUST core. + body = must_text + total = must_tokens + if total > budget_hard: + raise ValueError( + f"Prompt cannot fit budget_hard={budget_hard}; MUST core " + f"is {total} tokens" + ) + + return _emit(body, user_turn_prose) + + +def _emit(system_body: str, user_turn_prose: str | None) -> list[Message]: + msgs: list[Message] = [Message(role="system", content=system_body)] + if user_turn_prose is not None: + msgs.append(Message(role="user", content=user_turn_prose)) + return msgs + + +__all__ = ["assemble_narrative_prompt"] diff --git a/tests/test_prompt.py b/tests/test_prompt.py new file mode 100644 index 0000000..bef8dc0 --- /dev/null +++ b/tests/test_prompt.py @@ -0,0 +1,255 @@ +"""Tests for chat.services.prompt.assemble_narrative_prompt. + +Covers Task 18 — must/should/nice trim tiers (Requirements §3.2) and +the speaker prompt assembly order (§6.3). Tests use direct event-log +seeding so the projector populates state exactly the way the runtime +will at play-time. No LLM is invoked: prompt assembly is deterministic. +""" + +from __future__ import annotations + +import pytest + +from chat.db.connection import open_db +from chat.db.migrate import apply_migrations +from chat.eventlog.log import append_event +from chat.eventlog.projector import project +import chat.state.entities # noqa: F401 (registers handlers) +import chat.state.edges # noqa: F401 +import chat.state.memory # noqa: F401 +import chat.state.world # noqa: F401 +from chat.llm.client import Message +from chat.services.prompt import assemble_narrative_prompt + + +def _seed_basic(conn) -> None: + """Seed bot, you-entity, edge, chat, container, scene, activities.""" + append_event(conn, kind="bot_authored", payload={ + "id": "bot_a", + "name": "Aria", + "persona": "reserved coworker who notices things", + "voice_samples": ["I — sorry, I didn't mean to.", "Right. Of course."], + "traits": ["introverted", "observant"], + "backstory": "An archivist who joined the firm last spring.", + "initial_relationship_to_you": "coworker; mild crush; never voiced", + "kickoff_prose": "you stay late at the office", + }) + append_event(conn, kind="you_authored", payload={ + "name": "Sam", + "pronouns": "they/them", + "persona": "tired analyst", + }) + append_event(conn, kind="chat_created", payload={ + "id": "chat_bot_a", + "host_bot_id": "bot_a", + "guest_bot_id": None, + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1 evening", + "weather": "clear", + }) + append_event(conn, kind="container_created", payload={ + "chat_id": "chat_bot_a", + "name": "office bullpen", + "type": "workplace", + "properties": {"public": False, "moving": False, "audible_range": "room"}, + }) + append_event(conn, kind="edge_update", payload={ + "source_id": "bot_a", + "target_id": "you", + "affinity_delta": 12, + "trust_delta": 5, + "knowledge_facts": [ + "they work on the same floor", + "they've stayed late twice this week", + ], + }) + append_event(conn, kind="activity_change", payload={ + "entity_id": "you", + "container_id": 1, + "posture": "sitting at your desk", + "action": {"verb": "finishing emails"}, + "attention": "the screen", + "holding": ["coffee mug"], + }) + append_event(conn, kind="activity_change", payload={ + "entity_id": "bot_a", + "container_id": 1, + "posture": "sitting at her desk", + "action": {"verb": "pretending to work"}, + "attention": "you, in glances", + }) + append_event(conn, kind="scene_opened", payload={ + "chat_id": "chat_bot_a", + "container_id": 1, + "started_at": "2026-04-26T20:00:00+00:00", + "participants": ["you", "bot_a"], + }) + project(conn) + + +def test_basic_assembly_returns_system_message_with_all_must_blocks(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_basic(conn) + msgs = assemble_narrative_prompt( + conn, + chat_id="chat_bot_a", + speaker_bot_id="bot_a", + recent_dialogue=[], + retrieved_memory_summaries=[], + ) + assert isinstance(msgs, list) + assert len(msgs) == 1 + sys_msg = msgs[0] + assert isinstance(sys_msg, Message) + assert sys_msg.role == "system" + body = sys_msg.content + # Must-include markers + assert "Aria" in body + assert "PERSONA" in body + assert "ACTIVITIES" in body + assert "CURRENT SCENE" in body + # Edge to addressee — name + numeric values (default affinity 50, +12 = 62) + assert "Sam" in body + assert "62/100" in body + + +def test_user_turn_appended_as_user_message(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_basic(conn) + msgs = assemble_narrative_prompt( + conn, + chat_id="chat_bot_a", + speaker_bot_id="bot_a", + user_turn_prose="*looks up* Hey.", + recent_dialogue=[], + retrieved_memory_summaries=[], + ) + assert len(msgs) == 2 + assert msgs[0].role == "system" + assert msgs[1].role == "user" + assert msgs[1].content == "*looks up* Hey." + + +def test_must_only_succeeds_with_empty_optional_blocks(tmp_path): + """No dialogue, memories, other edges, or previous scene summary — should not raise.""" + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_basic(conn) + msgs = assemble_narrative_prompt( + conn, + chat_id="chat_bot_a", + speaker_bot_id="bot_a", + recent_dialogue=None, # default → nothing + retrieved_memory_summaries=None, + user_turn_prose=None, + ) + assert len(msgs) == 1 + body = msgs[0].content + # Must blocks present + assert "PERSONA" in body + assert "ACTIVITIES" in body + # Optional blocks not in body (nothing to render) + assert "OTHER EDGES" not in body + assert "PREVIOUS SCENE SUMMARY" not in body + assert "RELEVANT MEMORIES" not in body + + +def test_long_dialogue_keeps_last_4_verbatim_and_summarizes_earlier(tmp_path): + """Stuff a huge dialogue history under budget pressure; older turns + must be elided to a placeholder, the last 4 verbatim, and earlier + unique markers gone. + """ + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_basic(conn) + dialogue = [] + for i in range(20): + speaker = "you" if i % 2 == 0 else "bot_a" + # Each line ~250 tokens of filler => 20 turns ≈ 5000 tokens, + # which together with MUST blocks pushes over soft (1500). + dialogue.append({ + "speaker": speaker, + "text": f"unique-line-marker-{i:02d} " + ("filler " * 200), + }) + msgs = assemble_narrative_prompt( + conn, + chat_id="chat_bot_a", + speaker_bot_id="bot_a", + recent_dialogue=dialogue, + retrieved_memory_summaries=[], + # Soft small enough to force NICE trim but hard fits MUST + 4. + budget_soft=1200, + budget_hard=8000, + ) + body = msgs[0].content + # The last 4 unique markers (16, 17, 18, 19) must be present verbatim. + for i in range(16, 20): + assert f"unique-line-marker-{i:02d}" in body, f"expected last-4 marker {i} in body" + # Older markers must be dropped (replaced by elision placeholder). + for i in range(0, 16): + assert f"unique-line-marker-{i:02d}" not in body + # An "earlier" summary line must be present. + assert "earlier" in body.lower() + # Token count of system message respects hard budget. + import tiktoken + enc = tiktoken.get_encoding("cl100k_base") + assert len(enc.encode(body)) <= 8000 + + +def test_memories_drop_to_top_2_under_budget_pressure(tmp_path): + """4 memory summaries, each large; under tight soft budget only 2 should appear.""" + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_basic(conn) + # Each ~1500 tokens of repeated text; drop tier should kick in. + long_chunk = "alpha beta gamma delta " * 400 + memories = [ + f"MEMORY-A {long_chunk}", + f"MEMORY-B {long_chunk}", + f"MEMORY-C {long_chunk}", + f"MEMORY-D {long_chunk}", + ] + msgs = assemble_narrative_prompt( + conn, + chat_id="chat_bot_a", + speaker_bot_id="bot_a", + recent_dialogue=[], + retrieved_memory_summaries=memories, + # Pressure: budgets that allow MUST + 2 memories but not 4. + budget_soft=4000, + budget_hard=5000, + ) + body = msgs[0].content + # MEMORY-A and MEMORY-B are the top-2 and should remain; C & D dropped. + assert "MEMORY-A" in body + assert "MEMORY-B" in body + assert "MEMORY-C" not in body + assert "MEMORY-D" not in body + # Token count fits the hard budget. + import tiktoken + enc = tiktoken.get_encoding("cl100k_base") + assert len(enc.encode(body)) <= 5000 + + +def test_must_exceeds_budget_hard_raises_value_error(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_basic(conn) + with pytest.raises(ValueError): + assemble_narrative_prompt( + conn, + chat_id="chat_bot_a", + speaker_bot_id="bot_a", + recent_dialogue=[], + retrieved_memory_summaries=[], + budget_soft=5, + budget_hard=10, + )