Files
chat/tests/test_phase4_integration.py
Joseph Doherty 9987da2c07 feat: cross-chat search deep-links to turn via memories.event_id (T111.2)
Add ``m.event_id`` (T109's nullable column from migration 0014) to
``search_all_memories``'s SELECT, propagate it through the route's
template context, and have ``search.html`` build result links as
``/chats/{chat_id}#turn-{event_id}`` — matching the ``id="turn-{event_id}"``
anchor that Phase 3.5 T86 stamps on each turn DOM node so the chat page
scrolls to the originating turn on load. Memory rows projected before
the 0014 migration ran read NULL ``event_id``; the template falls back
to a chat-level link in that case so we never emit ``#turn-None``.

Pre-existing tests that asserted on the bare ``href="/chats/{chat_id}"``
contract are updated to assert on the ``href="/chats/{chat_id}#turn-``
prefix to reflect the new deep-link.
2026-04-27 05:42:17 -04:00

894 lines
33 KiB
Python

"""Phase 4 cross-feature integration tests (T97 follow-up + T101).
Cross-feature flows for the Phase 4 retrieval + branching + drawer
features. Each test drives multiple Phase 4 surfaces end-to-end and
asserts both event_log and projected-state outcomes.
Test inventory:
* ``test_post_turn_embeddings_indexed_via_worker_hook`` (T97.5) —
pins the production turn route's ``app=request.app`` plumbing so
the embedding worker actually receives jobs.
T101 additions (the "Phase 4 cross-feature integration" suite):
1. ``test_vector_retrieval_feedback_loop`` — write a memory, drain
the embedding worker, assert the vector path retrieves it.
2. ``test_branch_diverge_main_intact`` — create a branch from a
mid-log turn, switch, append more events, switch back and assert
the original log past the branch point is still present (Phase 4
branching is metadata-only — no read-side filter yet).
3. ``test_surgical_delete_truncates_log_and_writes_snapshot`` —
compute impact, confirm via the drawer route, assert the log was
truncated and a pre-rewind snapshot landed on disk.
4. ``test_hide_then_unhide_round_trip_through_read_recent_dialogue``
— flip ``hidden`` via the drawer route both directions and assert
``read_recent_dialogue`` honours the flag in real time.
5. ``test_cross_chat_search_surfaces_memories_in_three_chats`` —
write memories in 3 chats, hit ``/search?q=...`` and assert all
three appear.
The T97.5 test monkeypatches ``app.state.embedding_worker.enqueue`` to
record jobs (rather than draining the worker) because the bug it pins
is "did the call site pass ``app`` at all". T101 test 1 takes the
opposite tack: it drives the worker for real to verify the entire
write -> index -> retrieve loop.
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from chat.app import app
from chat.db.connection import open_db
from chat.eventlog.log import append_and_apply, append_event
from chat.eventlog.projector import project
from chat.llm.mock import MockLLMClient
def _zero_state() -> str:
return json.dumps(
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
)
def _override_llm(canned: list[str]) -> MockLLMClient:
from chat.web.kickoff import get_llm_client
mock = MockLLMClient(canned=list(canned))
app.dependency_overrides[get_llm_client] = lambda: mock
return mock
@pytest.fixture
def app_state_setup(tmp_path, monkeypatch):
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
db = tmp_path / "test.db"
monkeypatch.setenv("CHAT_DB_PATH", str(db))
with TestClient(app) as c:
# The background worker is disabled so the canned-response queue
# is consumed only by the request path. The embedding worker
# stays "started" but its loop won't observe the captured
# enqueues — we replace ``enqueue`` on the worker instance below.
app.state.background_worker.enabled = False
yield c
app.dependency_overrides.clear()
def _seed(db_path: Path) -> None:
"""Mirror of ``tests/test_turn_flow.py::_seed`` — single bot + chat
+ edge + activities so the prompt assembler has something to render.
"""
with open_db(db_path) as conn:
append_event(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "thoughtful, observant",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
"knowledge_facts": ["coworker"],
},
)
for entity_id, verb in [("you", "talking"), ("bot_a", "listening")]:
append_event(
conn,
kind="activity_change",
payload={
"entity_id": entity_id,
"posture": "sitting",
"action": {
"verb": verb,
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
project(conn)
def test_post_turn_embeddings_indexed_via_worker_hook(
app_state_setup, tmp_path
):
"""POST a turn; the route must pass ``app=request.app`` into
``record_turn_memory_for_present`` so the per-witness write enqueues
an :class:`EmbeddingJob` on ``app.state.embedding_worker``.
Without the T97.5 wiring this test fails: the call site previously
omitted ``app=`` and the helper's ``app is None`` branch silently
skipped every enqueue. We monkeypatch ``enqueue`` on the live
embedding worker (rather than draining the queue mid-request) so the
assertion does not depend on asyncio scheduling inside the
TestClient — the bug is in the wiring, and the wiring is what we
pin. The drain path is covered separately in
:mod:`tests.test_embedding_worker`.
"""
_seed(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "hello"}]}
)
_override_llm(
[canned_parse, "Hi there.", _zero_state(), _zero_state()]
)
captured: list = []
worker = app.state.embedding_worker
original_enqueue = worker.enqueue
worker.enqueue = captured.append # type: ignore[assignment]
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "hello"}
)
assert response.status_code == 204
finally:
worker.enqueue = original_enqueue # type: ignore[assignment]
app.dependency_overrides.clear()
# Single-bot turn -> one ``memory_written`` -> one EmbeddingJob.
# The job's ``memory_id`` should match the freshly-projected memory
# row, and its ``text`` should carry the assistant's narrative text.
assert len(captured) == 1
job = captured[0]
assert job.text == "Hi there."
with open_db(tmp_path / "test.db") as conn:
memory_ids = [
r[0]
for r in conn.execute(
"SELECT id FROM memories WHERE owner_id = ?",
("bot_a",),
).fetchall()
]
assert job.memory_id in memory_ids
# ---------------------------------------------------------------------------
# T101 — Phase 4 cross-feature integration suite.
# ---------------------------------------------------------------------------
#
# Helpers + the five required scenarios. Each test drives multiple Phase 4
# features so a regression in any one of them fails an integration check.
def _seed_minimal_chat(db_path: Path, chat_id: str = "chat_bot_a") -> None:
"""Seed bot_a, you, a chat, edges, and activities — same shape as
``tests/test_phase3_integration.py::_seed_single_bot_chat`` but
parameterised on chat_id so the cross-chat search test can stamp
several chats in the same database without renaming bots.
Uses ``append_and_apply`` rather than ``append_event`` + a final
``project`` so successive calls (e.g. one per chat in the
cross-chat-search test) don't try to re-project the cumulative
log and trip the ``chats.id`` UNIQUE constraint on the prior
chat's row.
"""
with open_db(db_path) as conn:
existing_bot = conn.execute(
"SELECT 1 FROM bots WHERE id = 'bot_a'"
).fetchone()
if existing_bot is None:
append_and_apply(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "thoughtful",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
},
)
append_and_apply(
conn,
kind="you_authored",
payload={
"name": "Me",
"pronouns": "they/them",
"persona": "",
},
)
append_and_apply(
conn,
kind="chat_created",
payload={
"id": chat_id,
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_and_apply(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": chat_id,
"knowledge_facts": [],
},
)
# Activities are unique per (entity_id) — only seed them on the
# first call (when the bot row is also fresh).
if existing_bot is None:
for entity_id, verb in [
("you", "talking"),
("bot_a", "listening"),
]:
append_and_apply(
conn,
kind="activity_change",
payload={
"entity_id": entity_id,
"posture": "sitting",
"action": {
"verb": verb,
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
# ---------------------------------------------------------------------------
# 1. Vector retrieval feedback loop.
# ---------------------------------------------------------------------------
async def test_vector_retrieval_feedback_loop(tmp_path):
"""End-to-end: write a memory through
:func:`record_turn_memory_for_present` so an :class:`EmbeddingJob`
lands on a worker, drain the worker, then call
:func:`vector_search` with the SAME pseudo-embedding function and
assert the just-written memory is the top hit.
Why this test does NOT use the TestClient fixture: the live
``app.state.embedding_worker`` is created inside the FastAPI
lifespan's event loop. ``await``-ing on it from pytest-asyncio's
loop trips ``"got Future attached to a different loop"``. We
instead spin up a fresh :class:`EmbeddingWorker` in the test
loop, exactly mirroring ``tests/test_embedding_worker.py``'s
pattern. The T97.5 test above pins the wiring between the live
HTTP route and the live app worker; this test pins the
write -> index -> retrieve loop with no transport in scope.
Cross-feature gaps this test catches:
* Memory write enqueues to the worker but the worker never
drains (e.g. ``_run`` deadlock or sentinel mishandled).
* Worker uses a different embedding function than
``vector_search`` at query time, producing different vectors
and breaking cosine retrieval.
* ``embeddings`` projector handler is not registered (e.g.
import ordering bug) so the event fires but the table stays
empty.
"""
from types import SimpleNamespace
from chat.db.migrate import apply_migrations
from chat.services.embedding_worker import EmbeddingWorker
from chat.services.embeddings import generate_embedding
from chat.services.memory_write import record_turn_memory_for_present
from chat.services.vector_search import vector_search
# Trigger projector handler registration. ``record_turn_memory_for_present``
# imports memory_write which imports the worker module, but the
# projector handlers live in ``chat.state.*`` modules and are
# registered as a side effect of import.
import chat.state.embeddings # noqa: F401
import chat.state.entities # noqa: F401
import chat.state.memory # noqa: F401
import chat.state.world # noqa: F401
db = tmp_path / "test.db"
apply_migrations(db)
_seed_minimal_chat(db)
# Spin up our own worker in the test event loop. ``client=None``
# is fine for the pseudo-embedding path — the local hash function
# does not require an LLM client.
worker = EmbeddingWorker(
conn_factory=lambda: open_db(db),
client=None,
)
await worker.start()
# Stub ``app`` — only ``app.state.embedding_worker`` is read by
# ``_write_one_memory``. SimpleNamespace gives us a stand-in that
# exposes ``state.embedding_worker`` without the full FastAPI app.
fake_app = SimpleNamespace(state=SimpleNamespace(embedding_worker=worker))
distinctive_text = "Maya watched the gondola lights drift across the lagoon."
with open_db(db) as conn:
record_turn_memory_for_present(
conn,
chat_id="chat_bot_a",
host_bot_id="bot_a",
guest_bot_id=None,
narrative_text=distinctive_text,
app=fake_app,
)
# Drain the worker via the sentinel. After this returns the
# ``embedding_indexed`` event has been projected.
await worker.stop()
# Generate a query embedding using the same function the worker
# used. The pseudo-embedding is deterministic so a query equal to
# the indexed text produces the identical vector and a cosine
# similarity of 1.0.
query_result = await generate_embedding(client=None, text=distinctive_text)
with open_db(db) as conn:
emb_count = conn.execute(
"SELECT COUNT(*) FROM embeddings"
).fetchone()[0]
assert emb_count == 1, (
"embedding worker did not project an embedding_indexed event"
)
hits = vector_search(
conn,
owner_id="bot_a",
witness_role="host", # bot_a is host, witness_host=1 by default
query_vector=query_result.vector,
k=4,
)
assert len(hits) == 1
top = hits[0]
assert top["pov_summary"] == distinctive_text
# Self-match: cosine of identical vectors is 1.0.
assert top["score"] == pytest.approx(1.0, abs=1e-9)
# ---------------------------------------------------------------------------
# 2. Branch + diverge: main's post-branch tail stays intact (Phase 4
# branches are metadata-only).
# ---------------------------------------------------------------------------
def test_branch_diverge_main_intact(app_state_setup, tmp_path):
"""Append turns 1-12 on main, branch from turn 10's event_id, switch
to the new branch, append 3 more "play" turns, switch back to main,
assert the original turn 11+ events are untouched.
Phase 4's branches table is metadata-only — the read-side filter
isn't wired yet, so all events live in one log regardless of which
branch is "active". This test pins that contract: switching does
not mutate or hide existing events on either branch.
Canned LLM queue: none. ``user_turn`` / ``assistant_turn`` are
transcript-only kinds with no projector handler that needs an
LLM call, and ``branch_created`` / ``branch_switched`` are pure
state events. We use ``append_and_apply`` directly rather than
driving the HTTP turn route, which would require a 6-slot canned
queue per turn (parse + narrative + 2 state-updates + scene-close
+ memory) for 15 turns total = 90 slots of plumbing irrelevant to
the branch contract.
"""
from chat.services.branching import branch_from_event, switch_active_branch
from chat.state.branches import active_branch
db = tmp_path / "test.db"
_seed_minimal_chat(db)
# Append 12 user_turn / assistant_turn pairs on main. We collect
# the assistant_turn id at index 10 (1-based: "turn 10") so the
# branch fork point is unambiguous.
main_turn_ids: list[int] = []
with open_db(db) as conn:
for i in range(1, 13):
user_id = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": f"main turn {i}",
"segments": [],
},
)
asst_id = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": f"main reply {i}",
"truncated": False,
"user_turn_id": user_id,
},
)
main_turn_ids.append(asst_id)
turn_10_id = main_turn_ids[9]
# Snapshot the post-turn-10 main tail (turns 11, 12 + their
# user_turn predecessors) so we can byte-compare after the
# round-trip.
main_tail_before = conn.execute(
"SELECT id, kind, payload_json, hidden, superseded_by "
"FROM event_log WHERE id > ? ORDER BY id",
(turn_10_id,),
).fetchall()
assert len(main_tail_before) == 4 # 2 user + 2 assistant past turn 10
# Branch from turn 10. Phase 4's helper validates the origin
# event id exists and emits ``branch_created``.
branch_from_event(
conn,
name="experiment",
origin_event_id=turn_10_id,
chat_id="chat_bot_a",
)
switch_active_branch(conn, name="experiment")
active = active_branch(conn)
assert active is not None and active["name"] == "experiment"
# Play 3 turns on the experiment branch.
for i in range(1, 4):
user_id = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": f"experiment turn {i}",
"segments": [],
},
)
append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": f"experiment reply {i}",
"truncated": False,
"user_turn_id": user_id,
},
)
# Switch back to main.
switch_active_branch(conn, name="main")
active2 = active_branch(conn)
assert active2 is not None and active2["name"] == "main"
# Main's original tail past turn 10 is byte-identical: the
# branching events (branch_created, branch_switched x2) and the
# 3 experiment turns sit AFTER the original tail in event_log
# order, never overwriting it.
main_tail_after = conn.execute(
"SELECT id, kind, payload_json, hidden, superseded_by "
"FROM event_log "
"WHERE id > ? AND id <= ? ORDER BY id",
(turn_10_id, main_turn_ids[-1]),
).fetchall()
assert main_tail_after == main_tail_before
# The 6 experiment events (3 user + 3 assistant) all live in
# the same log past the original main tail. Verify their
# prose payloads to disambiguate from main's content.
diverged = conn.execute(
"SELECT kind, json_extract(payload_json, '$.prose'), "
" json_extract(payload_json, '$.text') "
"FROM event_log WHERE id > ? "
" AND kind IN ('user_turn', 'assistant_turn') ORDER BY id",
(main_turn_ids[-1],),
).fetchall()
assert len(diverged) == 6
prose_or_text = [(row[1] or row[2]) for row in diverged]
# Sequence: user1, asst1, user2, asst2, user3, asst3.
assert "experiment turn 1" in prose_or_text
assert "experiment reply 1" in prose_or_text
assert "experiment turn 3" in prose_or_text
assert "experiment reply 3" in prose_or_text
# ---------------------------------------------------------------------------
# 3. Surgical delete: impact preview -> confirm -> log truncated +
# pre-rewind snapshot saved.
# ---------------------------------------------------------------------------
def test_surgical_delete_truncates_log_and_writes_snapshot(
app_state_setup, tmp_path
):
"""Compute the delete-impact for a turn (read-only preview), then
confirm via the POST drawer route. Assert:
* The preview returns 200 + cascade markup.
* The event_log is physically truncated past ``target_id - 1``.
* A snapshot file lands under ``<data_dir>/snapshots/rewind/``.
* The pre-rewind snapshot's ``last_event_id`` matches the high
water mark BEFORE the truncate (so recovery can replay back to
pre-delete state).
Snapshot location: T97.5's ``data_dir`` derives from the db's
parent directory when ``CHAT_DATA_DIR`` is unset. The fixture
sets ``CHAT_DB_PATH = tmp_path / "test.db"`` so the snapshot
parent is ``tmp_path / "snapshots" / "rewind"``.
No canned LLM queue — the preview is pure SQL and the rewind path
is also pure SQL (delete + reproject). The drawer routes don't
invoke the LLM.
"""
import json as _json
db = tmp_path / "test.db"
_seed_minimal_chat(db)
# Append a small fixed turn sequence we can predict the cascade for.
with open_db(db) as conn:
first_user = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "first message",
"segments": [],
},
)
append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "first reply",
"truncated": False,
"user_turn_id": first_user,
},
)
target_user = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "this turn will be deleted",
"segments": [],
},
)
target_asst = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "and so will this reply",
"truncated": False,
"user_turn_id": target_user,
},
)
# One trailing event past the target so we can verify the
# cascade catches >1 event.
trailing = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "trailing context",
"segments": [],
},
)
max_id_before = conn.execute(
"SELECT MAX(id) FROM event_log"
).fetchone()[0]
# ---- Preview: GET delete-preview returns 200 + the cascade list. ----
preview = app_state_setup.get(
f"/chats/chat_bot_a/drawer/turn/delete-preview/{target_user}"
)
assert preview.status_code == 200
body = preview.text
assert "delete-impact-modal" in body
assert f"Delete event {target_user}?" in body
assert "user_turn" in body
assert "assistant_turn" in body
# Confirm form points at the delete route.
assert f"/drawer/turn/delete/{target_user}" in body
# ---- Confirm: POST delete drops user, assistant, AND trailing. ----
confirm = app_state_setup.post(
f"/chats/chat_bot_a/drawer/turn/delete/{target_user}"
)
assert confirm.status_code == 200
# ---- Event log truncated past target_user - 1. ----
with open_db(db) as conn:
max_id_after = conn.execute(
"SELECT MAX(id) FROM event_log"
).fetchone()[0]
# delete_turn passes ``after_event_id = target_user - 1`` so
# everything from target_user forward is gone.
assert max_id_after == target_user - 1
for ev_id in (target_user, target_asst, trailing):
row = conn.execute(
"SELECT 1 FROM event_log WHERE id = ?", (ev_id,)
).fetchone()
assert row is None, f"event {ev_id} should have been deleted"
# ---- Pre-rewind snapshot landed on disk. ----
snapshot_dir = tmp_path / "snapshots" / "rewind"
assert snapshot_dir.exists(), (
f"snapshot dir not created: {snapshot_dir}"
)
snapshots = sorted(snapshot_dir.glob("*.json"))
assert len(snapshots) >= 1, (
f"no rewind snapshot written under {snapshot_dir}"
)
# Most-recent snapshot's last_event_id == pre-truncate high water
# mark, so a "restore" path could fully reverse the delete.
latest_snapshot = snapshots[-1]
snap_data = _json.loads(latest_snapshot.read_text())
assert snap_data["last_event_id"] == max_id_before
# ---------------------------------------------------------------------------
# 4. Hide + retrieval: drawer hide drops a turn from read_recent_dialogue,
# unhide restores it.
# ---------------------------------------------------------------------------
def test_hide_then_unhide_round_trip_through_read_recent_dialogue(
app_state_setup, tmp_path
):
"""Drive a hide -> read -> unhide -> read cycle through the drawer
HTTP route and assert ``read_recent_dialogue`` flips visibility
each step. T98.3 wires the route; T55 / turn_common owns the
``hidden = 0`` filter.
Cross-feature: the drawer HTTP handler emits a ``manual_edit``
event with branch ``turn_hidden``, the manual_edit projector
flips ``event_log.hidden``, and the prompt-window reader filters
on that column. Three layers — any one breaking would fail this
test.
No canned LLM queue — hide/unhide are pure SQL routes.
"""
from chat.services.turn_common import read_recent_dialogue
db = tmp_path / "test.db"
_seed_minimal_chat(db)
with open_db(db) as conn:
user_a = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "first user line",
"segments": [],
},
)
asst_a = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "first reply",
"truncated": False,
"user_turn_id": user_a,
},
)
user_b = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "second user line",
"segments": [],
},
)
asst_b = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "second reply",
"truncated": False,
"user_turn_id": user_b,
},
)
# Baseline: all 4 turns visible.
baseline = read_recent_dialogue(conn, "chat_bot_a", limit=10)
baseline_ids = {t["event_id"] for t in baseline}
assert {user_a, asst_a, user_b, asst_b} <= baseline_ids
# ---- Hide user_b via the drawer route. ----
hide_resp = app_state_setup.post(
f"/chats/chat_bot_a/drawer/turn/hide/{user_b}",
data={"hidden": "1"},
)
assert hide_resp.status_code == 200
with open_db(db) as conn:
# event_log.hidden flipped.
row = conn.execute(
"SELECT hidden FROM event_log WHERE id = ?", (user_b,)
).fetchone()
assert int(row[0]) == 1
# read_recent_dialogue drops user_b but keeps the others.
after_hide = read_recent_dialogue(conn, "chat_bot_a", limit=10)
after_hide_ids = {t["event_id"] for t in after_hide}
assert user_b not in after_hide_ids
# The other 3 turns still surface.
assert {user_a, asst_a, asst_b} <= after_hide_ids
# ---- Unhide via the SAME route with hidden=0. ----
unhide_resp = app_state_setup.post(
f"/chats/chat_bot_a/drawer/turn/hide/{user_b}",
data={"hidden": "0"},
)
assert unhide_resp.status_code == 200
with open_db(db) as conn:
row = conn.execute(
"SELECT hidden FROM event_log WHERE id = ?", (user_b,)
).fetchone()
assert int(row[0]) == 0
# read_recent_dialogue restores user_b.
after_unhide = read_recent_dialogue(conn, "chat_bot_a", limit=10)
after_unhide_ids = {t["event_id"] for t in after_unhide}
assert {user_a, asst_a, user_b, asst_b} <= after_unhide_ids
# Two manual_edit events landed (one per toggle), each with the
# turn_hidden branch tag.
edits = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'manual_edit' "
" AND json_extract(payload_json, '$.target_kind') = 'turn_hidden' "
"ORDER BY id"
).fetchall()
assert len(edits) == 2
# ---------------------------------------------------------------------------
# 5. Cross-chat search: memories across 3 chats all surface from /search.
# ---------------------------------------------------------------------------
def test_cross_chat_search_surfaces_memories_in_three_chats(
app_state_setup, tmp_path
):
"""Seed 3 chats each owned by bot_a (so the bot row exists for the
search route's display-name hydration), write a distinctive
memory in each, then GET ``/search?q=<distinctive>`` and assert
every chat appears as a result row.
Cross-feature: T93's :func:`search_all_memories` (no per-owner
filter) + T100's HTML route (display-name hydration via
``get_bot``/``get_chat``). The route's empty-query short-circuit
is incidentally exercised by the request setup but isn't the
focus.
No canned LLM queue — memory_written events are projected directly
via ``append_and_apply`` and the search route is pure SQL +
template rendering.
"""
db = tmp_path / "test.db"
# Three chats, all hosted by bot_a so bot_a is the owner of all
# three memories. _seed_minimal_chat skips the bot/you bootstrap
# after the first call so the cumulative seed is consistent.
chat_ids = ["chat_bot_a", "chat_bot_a_2", "chat_bot_a_3"]
for chat_id in chat_ids:
_seed_minimal_chat(db, chat_id=chat_id)
# Distinctive token — "wisteria" appears nowhere else in the seed.
distinctive = "wisteria"
with open_db(db) as conn:
for idx, chat_id in enumerate(chat_ids):
append_and_apply(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": chat_id,
"pov_summary": (
f"the {distinctive} bloomed by the gate (chat {idx})"
),
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"source": "direct",
"reliability": 1.0,
"significance": 1,
"pinned": 0,
"auto_pinned": 0,
},
)
# ---- GET /search?q=wisteria -> all 3 chats appear as result rows. ----
response = app_state_setup.get(f"/search?q={distinctive}")
assert response.status_code == 200
body = response.text
# Each chat_id appears in a result link href. T111.2 deep-links to
# the originating turn so the href is now
# ``href="/chats/{chat_id}#turn-{event_id}"``; we assert on the
# ``"/chats/{chat_id}#turn-`` prefix so the per-chat link is
# uniquely matched (a bare ``"/chats/chat_bot_a`` substring would
# also match ``chat_bot_a_2`` / ``chat_bot_a_3``).
for chat_id in chat_ids:
assert f'href="/chats/{chat_id}#turn-' in body, (
f"chat {chat_id} missing from /search results: {body!r}"
)
# The owner display name (BotA) renders for each row — verify >= 3
# occurrences so we know all 3 result rows hydrated, not just 1.
assert body.count("BotA") >= 3
# ---- Sanity: distractor query yields no results. ----
distractor_response = app_state_setup.get(
"/search?q=nonexistentterm12345"
)
assert distractor_response.status_code == 200
distractor_body = distractor_response.text
# The "no matches" empty-state copy fires.
assert "No matches" in distractor_body
for chat_id in chat_ids:
assert f'href="/chats/{chat_id}#turn-' not in distractor_body