Files
chat/tests/test_phase4_integration.py
2026-04-27 04:08:25 -04:00

892 lines
33 KiB
Python

"""Phase 4 cross-feature integration tests (T97 follow-up + T101).
Cross-feature flows for the Phase 4 retrieval + branching + drawer
features. Each test drives multiple Phase 4 surfaces end-to-end and
asserts both event_log and projected-state outcomes.
Test inventory:
* ``test_post_turn_embeddings_indexed_via_worker_hook`` (T97.5) —
pins the production turn route's ``app=request.app`` plumbing so
the embedding worker actually receives jobs.
T101 additions (the "Phase 4 cross-feature integration" suite):
1. ``test_vector_retrieval_feedback_loop`` — write a memory, drain
the embedding worker, assert the vector path retrieves it.
2. ``test_branch_diverge_main_intact`` — create a branch from a
mid-log turn, switch, append more events, switch back and assert
the original log past the branch point is still present (Phase 4
branching is metadata-only — no read-side filter yet).
3. ``test_surgical_delete_truncates_log_and_writes_snapshot`` —
compute impact, confirm via the drawer route, assert the log was
truncated and a pre-rewind snapshot landed on disk.
4. ``test_hide_then_unhide_round_trip_through_read_recent_dialogue``
— flip ``hidden`` via the drawer route both directions and assert
``read_recent_dialogue`` honours the flag in real time.
5. ``test_cross_chat_search_surfaces_memories_in_three_chats`` —
write memories in 3 chats, hit ``/search?q=...`` and assert all
three appear.
The T97.5 test monkeypatches ``app.state.embedding_worker.enqueue`` to
record jobs (rather than draining the worker) because the bug it pins
is "did the call site pass ``app`` at all". T101 test 1 takes the
opposite tack: it drives the worker for real to verify the entire
write -> index -> retrieve loop.
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from chat.app import app
from chat.db.connection import open_db
from chat.eventlog.log import append_and_apply, append_event
from chat.eventlog.projector import project
from chat.llm.mock import MockLLMClient
def _zero_state() -> str:
return json.dumps(
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
)
def _override_llm(canned: list[str]) -> MockLLMClient:
from chat.web.kickoff import get_llm_client
mock = MockLLMClient(canned=list(canned))
app.dependency_overrides[get_llm_client] = lambda: mock
return mock
@pytest.fixture
def app_state_setup(tmp_path, monkeypatch):
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
db = tmp_path / "test.db"
monkeypatch.setenv("CHAT_DB_PATH", str(db))
with TestClient(app) as c:
# The background worker is disabled so the canned-response queue
# is consumed only by the request path. The embedding worker
# stays "started" but its loop won't observe the captured
# enqueues — we replace ``enqueue`` on the worker instance below.
app.state.background_worker.enabled = False
yield c
app.dependency_overrides.clear()
def _seed(db_path: Path) -> None:
"""Mirror of ``tests/test_turn_flow.py::_seed`` — single bot + chat
+ edge + activities so the prompt assembler has something to render.
"""
with open_db(db_path) as conn:
append_event(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "thoughtful, observant",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
},
)
append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": "chat_bot_a",
"knowledge_facts": ["coworker"],
},
)
for entity_id, verb in [("you", "talking"), ("bot_a", "listening")]:
append_event(
conn,
kind="activity_change",
payload={
"entity_id": entity_id,
"posture": "sitting",
"action": {
"verb": verb,
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
project(conn)
def test_post_turn_embeddings_indexed_via_worker_hook(
app_state_setup, tmp_path
):
"""POST a turn; the route must pass ``app=request.app`` into
``record_turn_memory_for_present`` so the per-witness write enqueues
an :class:`EmbeddingJob` on ``app.state.embedding_worker``.
Without the T97.5 wiring this test fails: the call site previously
omitted ``app=`` and the helper's ``app is None`` branch silently
skipped every enqueue. We monkeypatch ``enqueue`` on the live
embedding worker (rather than draining the queue mid-request) so the
assertion does not depend on asyncio scheduling inside the
TestClient — the bug is in the wiring, and the wiring is what we
pin. The drain path is covered separately in
:mod:`tests.test_embedding_worker`.
"""
_seed(tmp_path / "test.db")
canned_parse = json.dumps(
{"segments": [{"kind": "dialogue", "text": "hello"}]}
)
_override_llm(
[canned_parse, "Hi there.", _zero_state(), _zero_state()]
)
captured: list = []
worker = app.state.embedding_worker
original_enqueue = worker.enqueue
worker.enqueue = captured.append # type: ignore[assignment]
try:
response = app_state_setup.post(
"/chats/chat_bot_a/turns", data={"prose": "hello"}
)
assert response.status_code == 204
finally:
worker.enqueue = original_enqueue # type: ignore[assignment]
app.dependency_overrides.clear()
# Single-bot turn -> one ``memory_written`` -> one EmbeddingJob.
# The job's ``memory_id`` should match the freshly-projected memory
# row, and its ``text`` should carry the assistant's narrative text.
assert len(captured) == 1
job = captured[0]
assert job.text == "Hi there."
with open_db(tmp_path / "test.db") as conn:
memory_ids = [
r[0]
for r in conn.execute(
"SELECT id FROM memories WHERE owner_id = ?",
("bot_a",),
).fetchall()
]
assert job.memory_id in memory_ids
# ---------------------------------------------------------------------------
# T101 — Phase 4 cross-feature integration suite.
# ---------------------------------------------------------------------------
#
# Helpers + the five required scenarios. Each test drives multiple Phase 4
# features so a regression in any one of them fails an integration check.
def _seed_minimal_chat(db_path: Path, chat_id: str = "chat_bot_a") -> None:
"""Seed bot_a, you, a chat, edges, and activities — same shape as
``tests/test_phase3_integration.py::_seed_single_bot_chat`` but
parameterised on chat_id so the cross-chat search test can stamp
several chats in the same database without renaming bots.
Uses ``append_and_apply`` rather than ``append_event`` + a final
``project`` so successive calls (e.g. one per chat in the
cross-chat-search test) don't try to re-project the cumulative
log and trip the ``chats.id`` UNIQUE constraint on the prior
chat's row.
"""
with open_db(db_path) as conn:
existing_bot = conn.execute(
"SELECT 1 FROM bots WHERE id = 'bot_a'"
).fetchone()
if existing_bot is None:
append_and_apply(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "thoughtful",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
},
)
append_and_apply(
conn,
kind="you_authored",
payload={
"name": "Me",
"pronouns": "they/them",
"persona": "",
},
)
append_and_apply(
conn,
kind="chat_created",
payload={
"id": chat_id,
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_and_apply(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": chat_id,
"knowledge_facts": [],
},
)
# Activities are unique per (entity_id) — only seed them on the
# first call (when the bot row is also fresh).
if existing_bot is None:
for entity_id, verb in [
("you", "talking"),
("bot_a", "listening"),
]:
append_and_apply(
conn,
kind="activity_change",
payload={
"entity_id": entity_id,
"posture": "sitting",
"action": {
"verb": verb,
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
# ---------------------------------------------------------------------------
# 1. Vector retrieval feedback loop.
# ---------------------------------------------------------------------------
async def test_vector_retrieval_feedback_loop(tmp_path):
"""End-to-end: write a memory through
:func:`record_turn_memory_for_present` so an :class:`EmbeddingJob`
lands on a worker, drain the worker, then call
:func:`vector_search` with the SAME pseudo-embedding function and
assert the just-written memory is the top hit.
Why this test does NOT use the TestClient fixture: the live
``app.state.embedding_worker`` is created inside the FastAPI
lifespan's event loop. ``await``-ing on it from pytest-asyncio's
loop trips ``"got Future attached to a different loop"``. We
instead spin up a fresh :class:`EmbeddingWorker` in the test
loop, exactly mirroring ``tests/test_embedding_worker.py``'s
pattern. The T97.5 test above pins the wiring between the live
HTTP route and the live app worker; this test pins the
write -> index -> retrieve loop with no transport in scope.
Cross-feature gaps this test catches:
* Memory write enqueues to the worker but the worker never
drains (e.g. ``_run`` deadlock or sentinel mishandled).
* Worker uses a different embedding function than
``vector_search`` at query time, producing different vectors
and breaking cosine retrieval.
* ``embeddings`` projector handler is not registered (e.g.
import ordering bug) so the event fires but the table stays
empty.
"""
from types import SimpleNamespace
from chat.db.migrate import apply_migrations
from chat.services.embedding_worker import EmbeddingWorker
from chat.services.embeddings import generate_embedding
from chat.services.memory_write import record_turn_memory_for_present
from chat.services.vector_search import vector_search
# Trigger projector handler registration. ``record_turn_memory_for_present``
# imports memory_write which imports the worker module, but the
# projector handlers live in ``chat.state.*`` modules and are
# registered as a side effect of import.
import chat.state.embeddings # noqa: F401
import chat.state.entities # noqa: F401
import chat.state.memory # noqa: F401
import chat.state.world # noqa: F401
db = tmp_path / "test.db"
apply_migrations(db)
_seed_minimal_chat(db)
# Spin up our own worker in the test event loop. ``client=None``
# is fine for the pseudo-embedding path — the local hash function
# does not require an LLM client.
worker = EmbeddingWorker(
conn_factory=lambda: open_db(db),
client=None,
)
await worker.start()
# Stub ``app`` — only ``app.state.embedding_worker`` is read by
# ``_write_one_memory``. SimpleNamespace gives us a stand-in that
# exposes ``state.embedding_worker`` without the full FastAPI app.
fake_app = SimpleNamespace(state=SimpleNamespace(embedding_worker=worker))
distinctive_text = "Maya watched the gondola lights drift across the lagoon."
with open_db(db) as conn:
record_turn_memory_for_present(
conn,
chat_id="chat_bot_a",
host_bot_id="bot_a",
guest_bot_id=None,
narrative_text=distinctive_text,
app=fake_app,
)
# Drain the worker via the sentinel. After this returns the
# ``embedding_indexed`` event has been projected.
await worker.stop()
# Generate a query embedding using the same function the worker
# used. The pseudo-embedding is deterministic so a query equal to
# the indexed text produces the identical vector and a cosine
# similarity of 1.0.
query_result = await generate_embedding(client=None, text=distinctive_text)
with open_db(db) as conn:
emb_count = conn.execute(
"SELECT COUNT(*) FROM embeddings"
).fetchone()[0]
assert emb_count == 1, (
"embedding worker did not project an embedding_indexed event"
)
hits = vector_search(
conn,
owner_id="bot_a",
witness_role="host", # bot_a is host, witness_host=1 by default
query_vector=query_result.vector,
k=4,
)
assert len(hits) == 1
top = hits[0]
assert top["pov_summary"] == distinctive_text
# Self-match: cosine of identical vectors is 1.0.
assert top["score"] == pytest.approx(1.0, abs=1e-9)
# ---------------------------------------------------------------------------
# 2. Branch + diverge: main's post-branch tail stays intact (Phase 4
# branches are metadata-only).
# ---------------------------------------------------------------------------
def test_branch_diverge_main_intact(app_state_setup, tmp_path):
"""Append turns 1-12 on main, branch from turn 10's event_id, switch
to the new branch, append 3 more "play" turns, switch back to main,
assert the original turn 11+ events are untouched.
Phase 4's branches table is metadata-only — the read-side filter
isn't wired yet, so all events live in one log regardless of which
branch is "active". This test pins that contract: switching does
not mutate or hide existing events on either branch.
Canned LLM queue: none. ``user_turn`` / ``assistant_turn`` are
transcript-only kinds with no projector handler that needs an
LLM call, and ``branch_created`` / ``branch_switched`` are pure
state events. We use ``append_and_apply`` directly rather than
driving the HTTP turn route, which would require a 6-slot canned
queue per turn (parse + narrative + 2 state-updates + scene-close
+ memory) for 15 turns total = 90 slots of plumbing irrelevant to
the branch contract.
"""
from chat.services.branching import branch_from_event, switch_active_branch
from chat.state.branches import active_branch
db = tmp_path / "test.db"
_seed_minimal_chat(db)
# Append 12 user_turn / assistant_turn pairs on main. We collect
# the assistant_turn id at index 10 (1-based: "turn 10") so the
# branch fork point is unambiguous.
main_turn_ids: list[int] = []
with open_db(db) as conn:
for i in range(1, 13):
user_id = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": f"main turn {i}",
"segments": [],
},
)
asst_id = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": f"main reply {i}",
"truncated": False,
"user_turn_id": user_id,
},
)
main_turn_ids.append(asst_id)
turn_10_id = main_turn_ids[9]
# Snapshot the post-turn-10 main tail (turns 11, 12 + their
# user_turn predecessors) so we can byte-compare after the
# round-trip.
main_tail_before = conn.execute(
"SELECT id, kind, payload_json, hidden, superseded_by "
"FROM event_log WHERE id > ? ORDER BY id",
(turn_10_id,),
).fetchall()
assert len(main_tail_before) == 4 # 2 user + 2 assistant past turn 10
# Branch from turn 10. Phase 4's helper validates the origin
# event id exists and emits ``branch_created``.
branch_from_event(
conn,
name="experiment",
origin_event_id=turn_10_id,
chat_id="chat_bot_a",
)
switch_active_branch(conn, name="experiment")
active = active_branch(conn)
assert active is not None and active["name"] == "experiment"
# Play 3 turns on the experiment branch.
for i in range(1, 4):
user_id = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": f"experiment turn {i}",
"segments": [],
},
)
append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": f"experiment reply {i}",
"truncated": False,
"user_turn_id": user_id,
},
)
# Switch back to main.
switch_active_branch(conn, name="main")
active2 = active_branch(conn)
assert active2 is not None and active2["name"] == "main"
# Main's original tail past turn 10 is byte-identical: the
# branching events (branch_created, branch_switched x2) and the
# 3 experiment turns sit AFTER the original tail in event_log
# order, never overwriting it.
main_tail_after = conn.execute(
"SELECT id, kind, payload_json, hidden, superseded_by "
"FROM event_log "
"WHERE id > ? AND id <= ? ORDER BY id",
(turn_10_id, main_turn_ids[-1]),
).fetchall()
assert main_tail_after == main_tail_before
# The 6 experiment events (3 user + 3 assistant) all live in
# the same log past the original main tail. Verify their
# prose payloads to disambiguate from main's content.
diverged = conn.execute(
"SELECT kind, json_extract(payload_json, '$.prose'), "
" json_extract(payload_json, '$.text') "
"FROM event_log WHERE id > ? "
" AND kind IN ('user_turn', 'assistant_turn') ORDER BY id",
(main_turn_ids[-1],),
).fetchall()
assert len(diverged) == 6
prose_or_text = [(row[1] or row[2]) for row in diverged]
# Sequence: user1, asst1, user2, asst2, user3, asst3.
assert "experiment turn 1" in prose_or_text
assert "experiment reply 1" in prose_or_text
assert "experiment turn 3" in prose_or_text
assert "experiment reply 3" in prose_or_text
# ---------------------------------------------------------------------------
# 3. Surgical delete: impact preview -> confirm -> log truncated +
# pre-rewind snapshot saved.
# ---------------------------------------------------------------------------
def test_surgical_delete_truncates_log_and_writes_snapshot(
app_state_setup, tmp_path
):
"""Compute the delete-impact for a turn (read-only preview), then
confirm via the POST drawer route. Assert:
* The preview returns 200 + cascade markup.
* The event_log is physically truncated past ``target_id - 1``.
* A snapshot file lands under ``<data_dir>/snapshots/rewind/``.
* The pre-rewind snapshot's ``last_event_id`` matches the high
water mark BEFORE the truncate (so recovery can replay back to
pre-delete state).
Snapshot location: T97.5's ``data_dir`` derives from the db's
parent directory when ``CHAT_DATA_DIR`` is unset. The fixture
sets ``CHAT_DB_PATH = tmp_path / "test.db"`` so the snapshot
parent is ``tmp_path / "snapshots" / "rewind"``.
No canned LLM queue — the preview is pure SQL and the rewind path
is also pure SQL (delete + reproject). The drawer routes don't
invoke the LLM.
"""
import json as _json
db = tmp_path / "test.db"
_seed_minimal_chat(db)
# Append a small fixed turn sequence we can predict the cascade for.
with open_db(db) as conn:
first_user = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "first message",
"segments": [],
},
)
append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "first reply",
"truncated": False,
"user_turn_id": first_user,
},
)
target_user = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "this turn will be deleted",
"segments": [],
},
)
target_asst = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "and so will this reply",
"truncated": False,
"user_turn_id": target_user,
},
)
# One trailing event past the target so we can verify the
# cascade catches >1 event.
trailing = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "trailing context",
"segments": [],
},
)
max_id_before = conn.execute(
"SELECT MAX(id) FROM event_log"
).fetchone()[0]
# ---- Preview: GET delete-preview returns 200 + the cascade list. ----
preview = app_state_setup.get(
f"/chats/chat_bot_a/drawer/turn/delete-preview/{target_user}"
)
assert preview.status_code == 200
body = preview.text
assert "delete-impact-modal" in body
assert f"Delete event {target_user}?" in body
assert "user_turn" in body
assert "assistant_turn" in body
# Confirm form points at the delete route.
assert f"/drawer/turn/delete/{target_user}" in body
# ---- Confirm: POST delete drops user, assistant, AND trailing. ----
confirm = app_state_setup.post(
f"/chats/chat_bot_a/drawer/turn/delete/{target_user}"
)
assert confirm.status_code == 200
# ---- Event log truncated past target_user - 1. ----
with open_db(db) as conn:
max_id_after = conn.execute(
"SELECT MAX(id) FROM event_log"
).fetchone()[0]
# delete_turn passes ``after_event_id = target_user - 1`` so
# everything from target_user forward is gone.
assert max_id_after == target_user - 1
for ev_id in (target_user, target_asst, trailing):
row = conn.execute(
"SELECT 1 FROM event_log WHERE id = ?", (ev_id,)
).fetchone()
assert row is None, f"event {ev_id} should have been deleted"
# ---- Pre-rewind snapshot landed on disk. ----
snapshot_dir = tmp_path / "snapshots" / "rewind"
assert snapshot_dir.exists(), (
f"snapshot dir not created: {snapshot_dir}"
)
snapshots = sorted(snapshot_dir.glob("*.json"))
assert len(snapshots) >= 1, (
f"no rewind snapshot written under {snapshot_dir}"
)
# Most-recent snapshot's last_event_id == pre-truncate high water
# mark, so a "restore" path could fully reverse the delete.
latest_snapshot = snapshots[-1]
snap_data = _json.loads(latest_snapshot.read_text())
assert snap_data["last_event_id"] == max_id_before
# ---------------------------------------------------------------------------
# 4. Hide + retrieval: drawer hide drops a turn from read_recent_dialogue,
# unhide restores it.
# ---------------------------------------------------------------------------
def test_hide_then_unhide_round_trip_through_read_recent_dialogue(
app_state_setup, tmp_path
):
"""Drive a hide -> read -> unhide -> read cycle through the drawer
HTTP route and assert ``read_recent_dialogue`` flips visibility
each step. T98.3 wires the route; T55 / turn_common owns the
``hidden = 0`` filter.
Cross-feature: the drawer HTTP handler emits a ``manual_edit``
event with branch ``turn_hidden``, the manual_edit projector
flips ``event_log.hidden``, and the prompt-window reader filters
on that column. Three layers — any one breaking would fail this
test.
No canned LLM queue — hide/unhide are pure SQL routes.
"""
from chat.services.turn_common import read_recent_dialogue
db = tmp_path / "test.db"
_seed_minimal_chat(db)
with open_db(db) as conn:
user_a = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "first user line",
"segments": [],
},
)
asst_a = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "first reply",
"truncated": False,
"user_turn_id": user_a,
},
)
user_b = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "second user line",
"segments": [],
},
)
asst_b = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "second reply",
"truncated": False,
"user_turn_id": user_b,
},
)
# Baseline: all 4 turns visible.
baseline = read_recent_dialogue(conn, "chat_bot_a", limit=10)
baseline_ids = {t["event_id"] for t in baseline}
assert {user_a, asst_a, user_b, asst_b} <= baseline_ids
# ---- Hide user_b via the drawer route. ----
hide_resp = app_state_setup.post(
f"/chats/chat_bot_a/drawer/turn/hide/{user_b}",
data={"hidden": "1"},
)
assert hide_resp.status_code == 200
with open_db(db) as conn:
# event_log.hidden flipped.
row = conn.execute(
"SELECT hidden FROM event_log WHERE id = ?", (user_b,)
).fetchone()
assert int(row[0]) == 1
# read_recent_dialogue drops user_b but keeps the others.
after_hide = read_recent_dialogue(conn, "chat_bot_a", limit=10)
after_hide_ids = {t["event_id"] for t in after_hide}
assert user_b not in after_hide_ids
# The other 3 turns still surface.
assert {user_a, asst_a, asst_b} <= after_hide_ids
# ---- Unhide via the SAME route with hidden=0. ----
unhide_resp = app_state_setup.post(
f"/chats/chat_bot_a/drawer/turn/hide/{user_b}",
data={"hidden": "0"},
)
assert unhide_resp.status_code == 200
with open_db(db) as conn:
row = conn.execute(
"SELECT hidden FROM event_log WHERE id = ?", (user_b,)
).fetchone()
assert int(row[0]) == 0
# read_recent_dialogue restores user_b.
after_unhide = read_recent_dialogue(conn, "chat_bot_a", limit=10)
after_unhide_ids = {t["event_id"] for t in after_unhide}
assert {user_a, asst_a, user_b, asst_b} <= after_unhide_ids
# Two manual_edit events landed (one per toggle), each with the
# turn_hidden branch tag.
edits = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'manual_edit' "
" AND json_extract(payload_json, '$.target_kind') = 'turn_hidden' "
"ORDER BY id"
).fetchall()
assert len(edits) == 2
# ---------------------------------------------------------------------------
# 5. Cross-chat search: memories across 3 chats all surface from /search.
# ---------------------------------------------------------------------------
def test_cross_chat_search_surfaces_memories_in_three_chats(
app_state_setup, tmp_path
):
"""Seed 3 chats each owned by bot_a (so the bot row exists for the
search route's display-name hydration), write a distinctive
memory in each, then GET ``/search?q=<distinctive>`` and assert
every chat appears as a result row.
Cross-feature: T93's :func:`search_all_memories` (no per-owner
filter) + T100's HTML route (display-name hydration via
``get_bot``/``get_chat``). The route's empty-query short-circuit
is incidentally exercised by the request setup but isn't the
focus.
No canned LLM queue — memory_written events are projected directly
via ``append_and_apply`` and the search route is pure SQL +
template rendering.
"""
db = tmp_path / "test.db"
# Three chats, all hosted by bot_a so bot_a is the owner of all
# three memories. _seed_minimal_chat skips the bot/you bootstrap
# after the first call so the cumulative seed is consistent.
chat_ids = ["chat_bot_a", "chat_bot_a_2", "chat_bot_a_3"]
for chat_id in chat_ids:
_seed_minimal_chat(db, chat_id=chat_id)
# Distinctive token — "wisteria" appears nowhere else in the seed.
distinctive = "wisteria"
with open_db(db) as conn:
for idx, chat_id in enumerate(chat_ids):
append_and_apply(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": chat_id,
"pov_summary": (
f"the {distinctive} bloomed by the gate (chat {idx})"
),
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"source": "direct",
"reliability": 1.0,
"significance": 1,
"pinned": 0,
"auto_pinned": 0,
},
)
# ---- GET /search?q=wisteria -> all 3 chats appear as result rows. ----
response = app_state_setup.get(f"/search?q={distinctive}")
assert response.status_code == 200
body = response.text
# Each chat_id appears in a result link href, e.g.
# ``href="/chats/chat_bot_a"``. The template renders one
# ``<a class="search-result-link" href="/chats/{chat_id}">`` per
# row, so a substring match per chat is sufficient.
for chat_id in chat_ids:
assert f'href="/chats/{chat_id}"' in body, (
f"chat {chat_id} missing from /search results: {body!r}"
)
# The owner display name (BotA) renders for each row — verify >= 3
# occurrences so we know all 3 result rows hydrated, not just 1.
assert body.count("BotA") >= 3
# ---- Sanity: distractor query yields no results. ----
distractor_response = app_state_setup.get(
"/search?q=nonexistentterm12345"
)
assert distractor_response.status_code == 200
distractor_body = distractor_response.text
# The "no matches" empty-state copy fires.
assert "No matches" in distractor_body
for chat_id in chat_ids:
assert f'href="/chats/{chat_id}"' not in distractor_body