"""Phase 4 cross-feature integration tests (T97 follow-up + T101). Cross-feature flows for the Phase 4 retrieval + branching + drawer features. Each test drives multiple Phase 4 surfaces end-to-end and asserts both event_log and projected-state outcomes. Test inventory: * ``test_post_turn_embeddings_indexed_via_worker_hook`` (T97.5) — pins the production turn route's ``app=request.app`` plumbing so the embedding worker actually receives jobs. T101 additions (the "Phase 4 cross-feature integration" suite): 1. ``test_vector_retrieval_feedback_loop`` — write a memory, drain the embedding worker, assert the vector path retrieves it. 2. ``test_branch_diverge_main_intact`` — create a branch from a mid-log turn, switch, append more events, switch back and assert the original log past the branch point is still present (Phase 4 branching is metadata-only — no read-side filter yet). 3. ``test_surgical_delete_truncates_log_and_writes_snapshot`` — compute impact, confirm via the drawer route, assert the log was truncated and a pre-rewind snapshot landed on disk. 4. ``test_hide_then_unhide_round_trip_through_read_recent_dialogue`` — flip ``hidden`` via the drawer route both directions and assert ``read_recent_dialogue`` honours the flag in real time. 5. ``test_cross_chat_search_surfaces_memories_in_three_chats`` — write memories in 3 chats, hit ``/search?q=...`` and assert all three appear. The T97.5 test monkeypatches ``app.state.embedding_worker.enqueue`` to record jobs (rather than draining the worker) because the bug it pins is "did the call site pass ``app`` at all". T101 test 1 takes the opposite tack: it drives the worker for real to verify the entire write -> index -> retrieve loop. """ from __future__ import annotations import json from pathlib import Path import pytest from fastapi.testclient import TestClient from chat.app import app from chat.db.connection import open_db from chat.eventlog.log import append_and_apply, append_event from chat.eventlog.projector import project from chat.llm.mock import MockLLMClient def _zero_state() -> str: return json.dumps( {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} ) def _override_llm(canned: list[str]) -> MockLLMClient: from chat.web.kickoff import get_llm_client mock = MockLLMClient(canned=list(canned)) app.dependency_overrides[get_llm_client] = lambda: mock return mock @pytest.fixture def app_state_setup(tmp_path, monkeypatch): cfg = tmp_path / "config.toml" cfg.write_text('featherless_api_key = "test"\n') monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) db = tmp_path / "test.db" monkeypatch.setenv("CHAT_DB_PATH", str(db)) with TestClient(app) as c: # The background worker is disabled so the canned-response queue # is consumed only by the request path. The embedding worker # stays "started" but its loop won't observe the captured # enqueues — we replace ``enqueue`` on the worker instance below. app.state.background_worker.enabled = False yield c app.dependency_overrides.clear() def _seed(db_path: Path) -> None: """Mirror of ``tests/test_turn_flow.py::_seed`` — single bot + chat + edge + activities so the prompt assembler has something to render. """ with open_db(db_path) as conn: append_event( conn, kind="bot_authored", payload={ "id": "bot_a", "name": "BotA", "persona": "thoughtful, observant", "voice_samples": [], "traits": [], "backstory": "", "initial_relationship_to_you": "", "kickoff_prose": "...", }, ) append_event( conn, kind="chat_created", payload={ "id": "chat_bot_a", "host_bot_id": "bot_a", "initial_time": "2026-04-26T20:00:00+00:00", "narrative_anchor": "Day 1", "weather": "", }, ) append_event( conn, kind="edge_update", payload={ "source_id": "bot_a", "target_id": "you", "chat_id": "chat_bot_a", "knowledge_facts": ["coworker"], }, ) for entity_id, verb in [("you", "talking"), ("bot_a", "listening")]: append_event( conn, kind="activity_change", payload={ "entity_id": entity_id, "posture": "sitting", "action": { "verb": verb, "interruptible": True, "required_attention": "low", "expected_duration": "ongoing", }, "attention": "", "holding": [], "status": {}, }, ) project(conn) def test_post_turn_embeddings_indexed_via_worker_hook( app_state_setup, tmp_path ): """POST a turn; the route must pass ``app=request.app`` into ``record_turn_memory_for_present`` so the per-witness write enqueues an :class:`EmbeddingJob` on ``app.state.embedding_worker``. Without the T97.5 wiring this test fails: the call site previously omitted ``app=`` and the helper's ``app is None`` branch silently skipped every enqueue. We monkeypatch ``enqueue`` on the live embedding worker (rather than draining the queue mid-request) so the assertion does not depend on asyncio scheduling inside the TestClient — the bug is in the wiring, and the wiring is what we pin. The drain path is covered separately in :mod:`tests.test_embedding_worker`. """ _seed(tmp_path / "test.db") canned_parse = json.dumps( {"segments": [{"kind": "dialogue", "text": "hello"}]} ) _override_llm( [canned_parse, "Hi there.", _zero_state(), _zero_state()] ) captured: list = [] worker = app.state.embedding_worker original_enqueue = worker.enqueue worker.enqueue = captured.append # type: ignore[assignment] try: response = app_state_setup.post( "/chats/chat_bot_a/turns", data={"prose": "hello"} ) assert response.status_code == 204 finally: worker.enqueue = original_enqueue # type: ignore[assignment] app.dependency_overrides.clear() # Single-bot turn -> one ``memory_written`` -> one EmbeddingJob. # The job's ``memory_id`` should match the freshly-projected memory # row, and its ``text`` should carry the assistant's narrative text. assert len(captured) == 1 job = captured[0] assert job.text == "Hi there." with open_db(tmp_path / "test.db") as conn: memory_ids = [ r[0] for r in conn.execute( "SELECT id FROM memories WHERE owner_id = ?", ("bot_a",), ).fetchall() ] assert job.memory_id in memory_ids # --------------------------------------------------------------------------- # T101 — Phase 4 cross-feature integration suite. # --------------------------------------------------------------------------- # # Helpers + the five required scenarios. Each test drives multiple Phase 4 # features so a regression in any one of them fails an integration check. def _seed_minimal_chat(db_path: Path, chat_id: str = "chat_bot_a") -> None: """Seed bot_a, you, a chat, edges, and activities — same shape as ``tests/test_phase3_integration.py::_seed_single_bot_chat`` but parameterised on chat_id so the cross-chat search test can stamp several chats in the same database without renaming bots. Uses ``append_and_apply`` rather than ``append_event`` + a final ``project`` so successive calls (e.g. one per chat in the cross-chat-search test) don't try to re-project the cumulative log and trip the ``chats.id`` UNIQUE constraint on the prior chat's row. """ with open_db(db_path) as conn: existing_bot = conn.execute( "SELECT 1 FROM bots WHERE id = 'bot_a'" ).fetchone() if existing_bot is None: append_and_apply( conn, kind="bot_authored", payload={ "id": "bot_a", "name": "BotA", "persona": "thoughtful", "voice_samples": [], "traits": [], "backstory": "", "initial_relationship_to_you": "", "kickoff_prose": "...", }, ) append_and_apply( conn, kind="you_authored", payload={ "name": "Me", "pronouns": "they/them", "persona": "", }, ) append_and_apply( conn, kind="chat_created", payload={ "id": chat_id, "host_bot_id": "bot_a", "initial_time": "2026-04-26T20:00:00+00:00", "narrative_anchor": "Day 1", "weather": "", }, ) append_and_apply( conn, kind="edge_update", payload={ "source_id": "bot_a", "target_id": "you", "chat_id": chat_id, "knowledge_facts": [], }, ) # Activities are unique per (entity_id) — only seed them on the # first call (when the bot row is also fresh). if existing_bot is None: for entity_id, verb in [ ("you", "talking"), ("bot_a", "listening"), ]: append_and_apply( conn, kind="activity_change", payload={ "entity_id": entity_id, "posture": "sitting", "action": { "verb": verb, "interruptible": True, "required_attention": "low", "expected_duration": "ongoing", }, "attention": "", "holding": [], "status": {}, }, ) # --------------------------------------------------------------------------- # 1. Vector retrieval feedback loop. # --------------------------------------------------------------------------- async def test_vector_retrieval_feedback_loop(tmp_path): """End-to-end: write a memory through :func:`record_turn_memory_for_present` so an :class:`EmbeddingJob` lands on a worker, drain the worker, then call :func:`vector_search` with the SAME pseudo-embedding function and assert the just-written memory is the top hit. Why this test does NOT use the TestClient fixture: the live ``app.state.embedding_worker`` is created inside the FastAPI lifespan's event loop. ``await``-ing on it from pytest-asyncio's loop trips ``"got Future attached to a different loop"``. We instead spin up a fresh :class:`EmbeddingWorker` in the test loop, exactly mirroring ``tests/test_embedding_worker.py``'s pattern. The T97.5 test above pins the wiring between the live HTTP route and the live app worker; this test pins the write -> index -> retrieve loop with no transport in scope. Cross-feature gaps this test catches: * Memory write enqueues to the worker but the worker never drains (e.g. ``_run`` deadlock or sentinel mishandled). * Worker uses a different embedding function than ``vector_search`` at query time, producing different vectors and breaking cosine retrieval. * ``embeddings`` projector handler is not registered (e.g. import ordering bug) so the event fires but the table stays empty. """ from types import SimpleNamespace from chat.db.migrate import apply_migrations from chat.services.embedding_worker import EmbeddingWorker from chat.services.embeddings import generate_embedding from chat.services.memory_write import record_turn_memory_for_present from chat.services.vector_search import vector_search # Trigger projector handler registration. ``record_turn_memory_for_present`` # imports memory_write which imports the worker module, but the # projector handlers live in ``chat.state.*`` modules and are # registered as a side effect of import. import chat.state.embeddings # noqa: F401 import chat.state.entities # noqa: F401 import chat.state.memory # noqa: F401 import chat.state.world # noqa: F401 db = tmp_path / "test.db" apply_migrations(db) _seed_minimal_chat(db) # Spin up our own worker in the test event loop. ``client=None`` # is fine for the pseudo-embedding path — the local hash function # does not require an LLM client. worker = EmbeddingWorker( conn_factory=lambda: open_db(db), client=None, ) await worker.start() # Stub ``app`` — only ``app.state.embedding_worker`` is read by # ``_write_one_memory``. SimpleNamespace gives us a stand-in that # exposes ``state.embedding_worker`` without the full FastAPI app. fake_app = SimpleNamespace(state=SimpleNamespace(embedding_worker=worker)) distinctive_text = "Maya watched the gondola lights drift across the lagoon." with open_db(db) as conn: record_turn_memory_for_present( conn, chat_id="chat_bot_a", host_bot_id="bot_a", guest_bot_id=None, narrative_text=distinctive_text, app=fake_app, ) # Drain the worker via the sentinel. After this returns the # ``embedding_indexed`` event has been projected. await worker.stop() # Generate a query embedding using the same function the worker # used. The pseudo-embedding is deterministic so a query equal to # the indexed text produces the identical vector and a cosine # similarity of 1.0. query_result = await generate_embedding(client=None, text=distinctive_text) with open_db(db) as conn: emb_count = conn.execute( "SELECT COUNT(*) FROM embeddings" ).fetchone()[0] assert emb_count == 1, ( "embedding worker did not project an embedding_indexed event" ) hits = vector_search( conn, owner_id="bot_a", witness_role="host", # bot_a is host, witness_host=1 by default query_vector=query_result.vector, k=4, ) assert len(hits) == 1 top = hits[0] assert top["pov_summary"] == distinctive_text # Self-match: cosine of identical vectors is 1.0. assert top["score"] == pytest.approx(1.0, abs=1e-9) # --------------------------------------------------------------------------- # 2. Branch + diverge: main's post-branch tail stays intact (Phase 4 # branches are metadata-only). # --------------------------------------------------------------------------- def test_branch_diverge_main_intact(app_state_setup, tmp_path): """Append turns 1-12 on main, branch from turn 10's event_id, switch to the new branch, append 3 more "play" turns, switch back to main, assert the original turn 11+ events are untouched. Phase 4's branches table is metadata-only — the read-side filter isn't wired yet, so all events live in one log regardless of which branch is "active". This test pins that contract: switching does not mutate or hide existing events on either branch. Canned LLM queue: none. ``user_turn`` / ``assistant_turn`` are transcript-only kinds with no projector handler that needs an LLM call, and ``branch_created`` / ``branch_switched`` are pure state events. We use ``append_and_apply`` directly rather than driving the HTTP turn route, which would require a 6-slot canned queue per turn (parse + narrative + 2 state-updates + scene-close + memory) for 15 turns total = 90 slots of plumbing irrelevant to the branch contract. """ from chat.services.branching import branch_from_event, switch_active_branch from chat.state.branches import active_branch db = tmp_path / "test.db" _seed_minimal_chat(db) # Append 12 user_turn / assistant_turn pairs on main. We collect # the assistant_turn id at index 10 (1-based: "turn 10") so the # branch fork point is unambiguous. main_turn_ids: list[int] = [] with open_db(db) as conn: for i in range(1, 13): user_id = append_and_apply( conn, kind="user_turn", payload={ "chat_id": "chat_bot_a", "prose": f"main turn {i}", "segments": [], }, ) asst_id = append_and_apply( conn, kind="assistant_turn", payload={ "chat_id": "chat_bot_a", "speaker_id": "bot_a", "text": f"main reply {i}", "truncated": False, "user_turn_id": user_id, }, ) main_turn_ids.append(asst_id) turn_10_id = main_turn_ids[9] # Snapshot the post-turn-10 main tail (turns 11, 12 + their # user_turn predecessors) so we can byte-compare after the # round-trip. main_tail_before = conn.execute( "SELECT id, kind, payload_json, hidden, superseded_by " "FROM event_log WHERE id > ? ORDER BY id", (turn_10_id,), ).fetchall() assert len(main_tail_before) == 4 # 2 user + 2 assistant past turn 10 # Branch from turn 10. Phase 4's helper validates the origin # event id exists and emits ``branch_created``. branch_from_event( conn, name="experiment", origin_event_id=turn_10_id, chat_id="chat_bot_a", ) switch_active_branch(conn, name="experiment") active = active_branch(conn) assert active is not None and active["name"] == "experiment" # Play 3 turns on the experiment branch. for i in range(1, 4): user_id = append_and_apply( conn, kind="user_turn", payload={ "chat_id": "chat_bot_a", "prose": f"experiment turn {i}", "segments": [], }, ) append_and_apply( conn, kind="assistant_turn", payload={ "chat_id": "chat_bot_a", "speaker_id": "bot_a", "text": f"experiment reply {i}", "truncated": False, "user_turn_id": user_id, }, ) # Switch back to main. switch_active_branch(conn, name="main") active2 = active_branch(conn) assert active2 is not None and active2["name"] == "main" # Main's original tail past turn 10 is byte-identical: the # branching events (branch_created, branch_switched x2) and the # 3 experiment turns sit AFTER the original tail in event_log # order, never overwriting it. main_tail_after = conn.execute( "SELECT id, kind, payload_json, hidden, superseded_by " "FROM event_log " "WHERE id > ? AND id <= ? ORDER BY id", (turn_10_id, main_turn_ids[-1]), ).fetchall() assert main_tail_after == main_tail_before # The 6 experiment events (3 user + 3 assistant) all live in # the same log past the original main tail. Verify their # prose payloads to disambiguate from main's content. diverged = conn.execute( "SELECT kind, json_extract(payload_json, '$.prose'), " " json_extract(payload_json, '$.text') " "FROM event_log WHERE id > ? " " AND kind IN ('user_turn', 'assistant_turn') ORDER BY id", (main_turn_ids[-1],), ).fetchall() assert len(diverged) == 6 prose_or_text = [(row[1] or row[2]) for row in diverged] # Sequence: user1, asst1, user2, asst2, user3, asst3. assert "experiment turn 1" in prose_or_text assert "experiment reply 1" in prose_or_text assert "experiment turn 3" in prose_or_text assert "experiment reply 3" in prose_or_text # --------------------------------------------------------------------------- # 3. Surgical delete: impact preview -> confirm -> log truncated + # pre-rewind snapshot saved. # --------------------------------------------------------------------------- def test_surgical_delete_truncates_log_and_writes_snapshot( app_state_setup, tmp_path ): """Compute the delete-impact for a turn (read-only preview), then confirm via the POST drawer route. Assert: * The preview returns 200 + cascade markup. * The event_log is physically truncated past ``target_id - 1``. * A snapshot file lands under ``/snapshots/rewind/``. * The pre-rewind snapshot's ``last_event_id`` matches the high water mark BEFORE the truncate (so recovery can replay back to pre-delete state). Snapshot location: T97.5's ``data_dir`` derives from the db's parent directory when ``CHAT_DATA_DIR`` is unset. The fixture sets ``CHAT_DB_PATH = tmp_path / "test.db"`` so the snapshot parent is ``tmp_path / "snapshots" / "rewind"``. No canned LLM queue — the preview is pure SQL and the rewind path is also pure SQL (delete + reproject). The drawer routes don't invoke the LLM. """ import json as _json db = tmp_path / "test.db" _seed_minimal_chat(db) # Append a small fixed turn sequence we can predict the cascade for. with open_db(db) as conn: first_user = append_and_apply( conn, kind="user_turn", payload={ "chat_id": "chat_bot_a", "prose": "first message", "segments": [], }, ) append_and_apply( conn, kind="assistant_turn", payload={ "chat_id": "chat_bot_a", "speaker_id": "bot_a", "text": "first reply", "truncated": False, "user_turn_id": first_user, }, ) target_user = append_and_apply( conn, kind="user_turn", payload={ "chat_id": "chat_bot_a", "prose": "this turn will be deleted", "segments": [], }, ) target_asst = append_and_apply( conn, kind="assistant_turn", payload={ "chat_id": "chat_bot_a", "speaker_id": "bot_a", "text": "and so will this reply", "truncated": False, "user_turn_id": target_user, }, ) # One trailing event past the target so we can verify the # cascade catches >1 event. trailing = append_and_apply( conn, kind="user_turn", payload={ "chat_id": "chat_bot_a", "prose": "trailing context", "segments": [], }, ) max_id_before = conn.execute( "SELECT MAX(id) FROM event_log" ).fetchone()[0] # ---- Preview: GET delete-preview returns 200 + the cascade list. ---- preview = app_state_setup.get( f"/chats/chat_bot_a/drawer/turn/delete-preview/{target_user}" ) assert preview.status_code == 200 body = preview.text assert "delete-impact-modal" in body assert f"Delete event {target_user}?" in body assert "user_turn" in body assert "assistant_turn" in body # Confirm form points at the delete route. assert f"/drawer/turn/delete/{target_user}" in body # ---- Confirm: POST delete drops user, assistant, AND trailing. ---- confirm = app_state_setup.post( f"/chats/chat_bot_a/drawer/turn/delete/{target_user}" ) assert confirm.status_code == 200 # ---- Event log truncated past target_user - 1. ---- with open_db(db) as conn: max_id_after = conn.execute( "SELECT MAX(id) FROM event_log" ).fetchone()[0] # delete_turn passes ``after_event_id = target_user - 1`` so # everything from target_user forward is gone. assert max_id_after == target_user - 1 for ev_id in (target_user, target_asst, trailing): row = conn.execute( "SELECT 1 FROM event_log WHERE id = ?", (ev_id,) ).fetchone() assert row is None, f"event {ev_id} should have been deleted" # ---- Pre-rewind snapshot landed on disk. ---- snapshot_dir = tmp_path / "snapshots" / "rewind" assert snapshot_dir.exists(), ( f"snapshot dir not created: {snapshot_dir}" ) snapshots = sorted(snapshot_dir.glob("*.json")) assert len(snapshots) >= 1, ( f"no rewind snapshot written under {snapshot_dir}" ) # Most-recent snapshot's last_event_id == pre-truncate high water # mark, so a "restore" path could fully reverse the delete. latest_snapshot = snapshots[-1] snap_data = _json.loads(latest_snapshot.read_text()) assert snap_data["last_event_id"] == max_id_before # --------------------------------------------------------------------------- # 4. Hide + retrieval: drawer hide drops a turn from read_recent_dialogue, # unhide restores it. # --------------------------------------------------------------------------- def test_hide_then_unhide_round_trip_through_read_recent_dialogue( app_state_setup, tmp_path ): """Drive a hide -> read -> unhide -> read cycle through the drawer HTTP route and assert ``read_recent_dialogue`` flips visibility each step. T98.3 wires the route; T55 / turn_common owns the ``hidden = 0`` filter. Cross-feature: the drawer HTTP handler emits a ``manual_edit`` event with branch ``turn_hidden``, the manual_edit projector flips ``event_log.hidden``, and the prompt-window reader filters on that column. Three layers — any one breaking would fail this test. No canned LLM queue — hide/unhide are pure SQL routes. """ from chat.services.turn_common import read_recent_dialogue db = tmp_path / "test.db" _seed_minimal_chat(db) with open_db(db) as conn: user_a = append_and_apply( conn, kind="user_turn", payload={ "chat_id": "chat_bot_a", "prose": "first user line", "segments": [], }, ) asst_a = append_and_apply( conn, kind="assistant_turn", payload={ "chat_id": "chat_bot_a", "speaker_id": "bot_a", "text": "first reply", "truncated": False, "user_turn_id": user_a, }, ) user_b = append_and_apply( conn, kind="user_turn", payload={ "chat_id": "chat_bot_a", "prose": "second user line", "segments": [], }, ) asst_b = append_and_apply( conn, kind="assistant_turn", payload={ "chat_id": "chat_bot_a", "speaker_id": "bot_a", "text": "second reply", "truncated": False, "user_turn_id": user_b, }, ) # Baseline: all 4 turns visible. baseline = read_recent_dialogue(conn, "chat_bot_a", limit=10) baseline_ids = {t["event_id"] for t in baseline} assert {user_a, asst_a, user_b, asst_b} <= baseline_ids # ---- Hide user_b via the drawer route. ---- hide_resp = app_state_setup.post( f"/chats/chat_bot_a/drawer/turn/hide/{user_b}", data={"hidden": "1"}, ) assert hide_resp.status_code == 200 with open_db(db) as conn: # event_log.hidden flipped. row = conn.execute( "SELECT hidden FROM event_log WHERE id = ?", (user_b,) ).fetchone() assert int(row[0]) == 1 # read_recent_dialogue drops user_b but keeps the others. after_hide = read_recent_dialogue(conn, "chat_bot_a", limit=10) after_hide_ids = {t["event_id"] for t in after_hide} assert user_b not in after_hide_ids # The other 3 turns still surface. assert {user_a, asst_a, asst_b} <= after_hide_ids # ---- Unhide via the SAME route with hidden=0. ---- unhide_resp = app_state_setup.post( f"/chats/chat_bot_a/drawer/turn/hide/{user_b}", data={"hidden": "0"}, ) assert unhide_resp.status_code == 200 with open_db(db) as conn: row = conn.execute( "SELECT hidden FROM event_log WHERE id = ?", (user_b,) ).fetchone() assert int(row[0]) == 0 # read_recent_dialogue restores user_b. after_unhide = read_recent_dialogue(conn, "chat_bot_a", limit=10) after_unhide_ids = {t["event_id"] for t in after_unhide} assert {user_a, asst_a, user_b, asst_b} <= after_unhide_ids # Two manual_edit events landed (one per toggle), each with the # turn_hidden branch tag. edits = conn.execute( "SELECT payload_json FROM event_log " "WHERE kind = 'manual_edit' " " AND json_extract(payload_json, '$.target_kind') = 'turn_hidden' " "ORDER BY id" ).fetchall() assert len(edits) == 2 # --------------------------------------------------------------------------- # 5. Cross-chat search: memories across 3 chats all surface from /search. # --------------------------------------------------------------------------- def test_cross_chat_search_surfaces_memories_in_three_chats( app_state_setup, tmp_path ): """Seed 3 chats each owned by bot_a (so the bot row exists for the search route's display-name hydration), write a distinctive memory in each, then GET ``/search?q=`` and assert every chat appears as a result row. Cross-feature: T93's :func:`search_all_memories` (no per-owner filter) + T100's HTML route (display-name hydration via ``get_bot``/``get_chat``). The route's empty-query short-circuit is incidentally exercised by the request setup but isn't the focus. No canned LLM queue — memory_written events are projected directly via ``append_and_apply`` and the search route is pure SQL + template rendering. """ db = tmp_path / "test.db" # Three chats, all hosted by bot_a so bot_a is the owner of all # three memories. _seed_minimal_chat skips the bot/you bootstrap # after the first call so the cumulative seed is consistent. chat_ids = ["chat_bot_a", "chat_bot_a_2", "chat_bot_a_3"] for chat_id in chat_ids: _seed_minimal_chat(db, chat_id=chat_id) # Distinctive token — "wisteria" appears nowhere else in the seed. distinctive = "wisteria" with open_db(db) as conn: for idx, chat_id in enumerate(chat_ids): append_and_apply( conn, kind="memory_written", payload={ "owner_id": "bot_a", "chat_id": chat_id, "pov_summary": ( f"the {distinctive} bloomed by the gate (chat {idx})" ), "witness_you": 1, "witness_host": 1, "witness_guest": 0, "source": "direct", "reliability": 1.0, "significance": 1, "pinned": 0, "auto_pinned": 0, }, ) # ---- GET /search?q=wisteria -> all 3 chats appear as result rows. ---- response = app_state_setup.get(f"/search?q={distinctive}") assert response.status_code == 200 body = response.text # Each chat_id appears in a result link href, e.g. # ``href="/chats/chat_bot_a"``. The template renders one # ```` per # row, so a substring match per chat is sufficient. for chat_id in chat_ids: assert f'href="/chats/{chat_id}"' in body, ( f"chat {chat_id} missing from /search results: {body!r}" ) # The owner display name (BotA) renders for each row — verify >= 3 # occurrences so we know all 3 result rows hydrated, not just 1. assert body.count("BotA") >= 3 # ---- Sanity: distractor query yields no results. ---- distractor_response = app_state_setup.get( "/search?q=nonexistentterm12345" ) assert distractor_response.status_code == 200 distractor_body = distractor_response.text # The "no matches" empty-state copy fires. assert "No matches" in distractor_body for chat_id in chat_ids: assert f'href="/chats/{chat_id}"' not in distractor_body