Files
chat/tests/test_phase45_integration.py
2026-04-27 07:03:56 -04:00

768 lines
29 KiB
Python

"""Phase 4.5 cross-feature integration tests (T117).
End-to-end multi-feature flows specific to the Phase 4.5 changes
(T103-T114). Mirrors :mod:`tests.test_phase4_integration` in shape:
each test drives multiple Phase 4.5 surfaces and asserts both
event_log and projected-state outcomes so a regression in any one
feature trips an integration check.
Test inventory:
1. ``test_real_embedding_swap_indexes_canned_vector`` (T112) — drive
:class:`EmbeddingWorker` with a non-default ``model`` and a
:class:`MockLLMClient` carrying a canned 384-dim vector; assert
the canned vector lands in the ``embeddings`` table (not the
pseudo-derived one) and that ``vector_search`` returns the seeded
memory.
2. ``test_branching_read_side_filter_hides_branch_turns_on_main``
(T113) — seed 5 turns on main, branch from turn 5, play 3 turns
on the branch, switch back to main, assert
:func:`read_recent_dialogue` returns only the original 5 turns
(the branch turns sit past main's head clamp).
3. ``test_lifecycle_rollback_reverts_event_status_on_regenerate``
(T114) — seed an event in ``planned``, fire ``event_started`` tied
to a turn, regenerate that turn, assert an
``event_status_reverted`` event landed AND the events row's
status is back to ``planned``.
4. ``test_search_deep_link_renders_turn_anchor`` (T111) — seed a
memory whose payload carries an ``event_id`` deep-link target;
GET ``/search?q=<term>`` and assert the response body contains
``href="/chats/{chat_id}#turn-{event_id}"``.
5. ``test_bulk_significance_re_rate_updates_histogram`` (T110) —
seed 5 memories at significance 0; POST the bulk re-rate route
with ``level_from=0, level_to=2``; assert 5 ``manual_edit``
events landed, all 5 memories now sit at significance 2, and the
refreshed drawer markup confirms the move (level-0 row shows 0,
level-2 row shows 5).
"""
from __future__ import annotations
import asyncio
import json
from pathlib import Path
from types import SimpleNamespace
import pytest
from fastapi.testclient import TestClient
from chat.app import app
from chat.db.connection import open_db
from chat.db.migrate import apply_migrations
from chat.eventlog.log import append_and_apply, append_event
from chat.eventlog.projector import project
from chat.llm.mock import MockLLMClient
# Trigger projector handler registration. Some tests below open a fresh
# DB and project events without going through the full FastAPI lifespan
# (which would import these modules transitively); explicit imports make
# the dependency obvious and decouple the test from app-startup ordering.
import chat.state.branches # noqa: F401
import chat.state.embeddings # noqa: F401
import chat.state.entities # noqa: F401
import chat.state.events # noqa: F401
import chat.state.manual_edit # noqa: F401
import chat.state.memory # noqa: F401
import chat.state.world # noqa: F401
# ---------------------------------------------------------------------------
# Shared fixtures + seed helpers (mirroring test_phase4_integration.py).
# ---------------------------------------------------------------------------
@pytest.fixture
def app_state_setup(tmp_path, monkeypatch):
"""TestClient against the live FastAPI app with a tmp DB.
Identical shape to :mod:`tests.test_phase4_integration` so the
Phase 4.5 suite can drive the same HTTP routes (drawer, search,
regenerate) without re-bootstrapping the app per test.
"""
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
db = tmp_path / "test.db"
monkeypatch.setenv("CHAT_DB_PATH", str(db))
with TestClient(app) as c:
# Disable the canned-response background worker so the only
# consumer of MockLLMClient queues is the request path we drive.
app.state.background_worker.enabled = False
yield c
app.dependency_overrides.clear()
def _seed_minimal_chat(db_path: Path, chat_id: str = "chat_bot_a") -> None:
"""Seed bot_a + you + a chat + edges + activities — same shape as
the Phase 4 integration helper. ``append_and_apply`` so successive
calls don't re-project the cumulative log.
"""
with open_db(db_path) as conn:
existing_bot = conn.execute(
"SELECT 1 FROM bots WHERE id = 'bot_a'"
).fetchone()
if existing_bot is None:
append_and_apply(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "thoughtful",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "...",
},
)
append_and_apply(
conn,
kind="you_authored",
payload={
"name": "Me",
"pronouns": "they/them",
"persona": "",
},
)
append_and_apply(
conn,
kind="chat_created",
payload={
"id": chat_id,
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
append_and_apply(
conn,
kind="edge_update",
payload={
"source_id": "bot_a",
"target_id": "you",
"chat_id": chat_id,
"knowledge_facts": [],
},
)
if existing_bot is None:
for entity_id, verb in [
("you", "talking"),
("bot_a", "listening"),
]:
append_and_apply(
conn,
kind="activity_change",
payload={
"entity_id": entity_id,
"posture": "sitting",
"action": {
"verb": verb,
"interruptible": True,
"required_attention": "low",
"expected_duration": "ongoing",
},
"attention": "",
"holding": [],
"status": {},
},
)
# ---------------------------------------------------------------------------
# 1. Real embedding swap (T112) — non-default model routes through
# ``client.embed`` and the canned vector lands in the embeddings table.
# ---------------------------------------------------------------------------
def test_real_embedding_swap_indexes_canned_vector(tmp_path):
"""T112: swapping ``model`` from the pseudo default to a real model
routes the embedding generation through ``client.embed`` instead of
the local hash-derived path.
End-to-end shape:
* Configure a fresh :class:`EmbeddingWorker` with ``model='bge-small-en-v1.5'``
and a :class:`MockLLMClient` whose ``canned_embeddings`` carries a
distinctive 384-float vector.
* Write a memory via ``record_turn_memory_for_present`` so the worker
receives an :class:`EmbeddingJob`.
* Drain the worker (sentinel-based stop).
* Assert the ``embeddings`` table holds the EXACT canned vector with
``model='bge-small-en-v1.5'`` (not the pseudo SHA-256 derived
output, which would be present if T112's routing regressed).
* Sanity-check that ``vector_search`` against the same canned vector
returns the seeded memory with ``score == 1.0`` (cosine self-match).
Why no FastAPI lifespan: the live ``app.state.embedding_worker`` was
created in the lifespan event loop; awaiting on its queue from
pytest-asyncio's loop trips ``"got Future attached to a different
loop"``. Mirrors the pattern in
``tests/test_phase4_integration.py::test_vector_retrieval_feedback_loop``.
"""
from chat.services.embedding_worker import EmbeddingWorker
from chat.services.memory_write import record_turn_memory_for_present
from chat.services.vector_search import vector_search
db = tmp_path / "test.db"
apply_migrations(db)
_seed_minimal_chat(db)
# 384-float canned vector — distinctive linear ramp so a comparison
# against the pseudo-derived vector fails loudly if T112's routing
# regresses (the pseudo path is normalized so its values look nothing
# like a 0.000..0.383 ramp).
canned_vector = [i / 1000.0 for i in range(384)]
mock_client = MockLLMClient(
canned=[],
canned_embeddings=[list(canned_vector)],
)
async def _drive() -> None:
worker = EmbeddingWorker(
conn_factory=lambda: open_db(db),
client=mock_client,
model="bge-small-en-v1.5", # T112: non-default routes via embed()
dim=384,
)
await worker.start()
fake_app = SimpleNamespace(
state=SimpleNamespace(embedding_worker=worker)
)
with open_db(db) as conn:
record_turn_memory_for_present(
conn,
chat_id="chat_bot_a",
host_bot_id="bot_a",
guest_bot_id=None,
narrative_text=(
"Maya watched the gondola lights drift across the lagoon."
),
app=fake_app,
)
await worker.stop()
asyncio.run(_drive())
with open_db(db) as conn:
emb_rows = conn.execute(
"SELECT memory_id, vector_json, model, dim FROM embeddings"
).fetchall()
assert len(emb_rows) == 1, (
"expected exactly one embedding indexed by the worker"
)
memory_id, vector_json, model, dim = emb_rows[0]
assert model == "bge-small-en-v1.5", (
f"expected non-default model tag, got {model!r}"
)
assert dim == 384
stored_vector = json.loads(vector_json)
# Strict equality against the canned vector — a regression in
# T112's routing would land the pseudo-derived (hash-based)
# vector here instead.
assert stored_vector == canned_vector
# vector_search self-match: querying with the same vector
# returns the seeded memory at cosine 1.0.
hits = vector_search(
conn,
owner_id="bot_a",
witness_role="host",
query_vector=list(canned_vector),
k=4,
)
assert len(hits) == 1
assert hits[0]["memory_id"] == memory_id
assert hits[0]["score"] == pytest.approx(1.0, abs=1e-9)
# ---------------------------------------------------------------------------
# 2. Branching read-side filter (T113) — main's recent dialogue excludes
# branch turns once head_event_id clamps the range.
# ---------------------------------------------------------------------------
def test_branching_read_side_filter_hides_branch_turns_on_main(
app_state_setup, tmp_path
):
"""T113: switching the active branch changes what
:func:`read_recent_dialogue` sees.
Setup:
* Seed 5 turns on main. Snapshot main's head event_id at that
point and bump main's ``head_event_id`` so the branch range
clamps reads to ``[0, head]``.
* Branch from turn 5; switch to the experiment branch; play 3
turns on it.
* Switch back to main.
Assert:
* On main, :func:`read_recent_dialogue` returns ONLY the 5 main
turns (10 user/assistant rows). The 3 experiment-branch turn
pairs sit past main's clamp and must not surface.
* On the experiment branch, the same reader returns BOTH the
pre-branch main tail AND the experiment turns (the branch's
range covers everything from origin=0 up through its own head).
Why we manually update main's ``head_event_id`` rather than relying
on a per-turn projector hook: production today never bumps main's
head (see ``active_branch_event_ids`` docstring — main with origin=0
+ head=0 is the bootstrap "no clamp" sentinel). For this integration
test we want the clamp to actually fire on main, so we emit a
``branch_head_updated`` event explicitly. This mirrors what a
future "main head tracker" would do.
"""
from chat.services.branching import (
branch_from_event,
switch_active_branch,
)
from chat.services.turn_common import read_recent_dialogue
from chat.state.branches import active_branch
db = tmp_path / "test.db"
_seed_minimal_chat(db)
main_assistant_ids: list[int] = []
with open_db(db) as conn:
for i in range(1, 6):
user_id = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": f"main turn {i}",
"segments": [],
},
)
asst_id = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": f"main reply {i}",
"truncated": False,
"user_turn_id": user_id,
},
)
main_assistant_ids.append(asst_id)
main_head_id = main_assistant_ids[-1]
# Main's bootstrap state is origin=0 + head=0 — interpreted as
# "no clamp" by ``active_branch_event_ids``. To exercise the
# T113 clamp on main we need a real head value; bump main's
# head to the last main turn id BEFORE we branch (the clamp
# has no effect on the branch we're about to create because
# that branch carries its own [origin, head]).
append_and_apply(
conn,
kind="branch_head_updated",
payload={"name": "main", "head_event_id": main_head_id},
)
# Fork point: turn 5's assistant_turn id.
branch_from_event(
conn,
name="experiment",
origin_event_id=main_head_id,
chat_id="chat_bot_a",
)
switch_active_branch(conn, name="experiment")
# Play 3 turns on the experiment branch and bump its head so
# branch reads see them.
experiment_assistant_ids: list[int] = []
for i in range(1, 4):
user_id = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": f"experiment turn {i}",
"segments": [],
},
)
asst_id = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": f"experiment reply {i}",
"truncated": False,
"user_turn_id": user_id,
},
)
experiment_assistant_ids.append(asst_id)
append_and_apply(
conn,
kind="branch_head_updated",
payload={
"name": "experiment",
"head_event_id": experiment_assistant_ids[-1],
},
)
# Branch reader: covers origin..head, so it sees BOTH main's
# pre-fork tail and the experiment turns.
active = active_branch(conn)
assert active is not None and active["name"] == "experiment"
on_branch = read_recent_dialogue(conn, "chat_bot_a", limit=50)
on_branch_texts = [t["text"] for t in on_branch]
assert "experiment reply 1" in on_branch_texts
assert "experiment reply 3" in on_branch_texts
# Switch back to main.
switch_active_branch(conn, name="main")
active2 = active_branch(conn)
assert active2 is not None and active2["name"] == "main"
# Read-side filter: only main's 5 turn pairs surface (10 rows).
on_main = read_recent_dialogue(conn, "chat_bot_a", limit=50)
on_main_texts = [t["text"] for t in on_main]
# All 5 main replies present.
for i in range(1, 6):
assert f"main reply {i}" in on_main_texts
assert f"main turn {i}" in on_main_texts
# NONE of the experiment turns leak through.
for i in range(1, 4):
assert f"experiment reply {i}" not in on_main_texts, (
f"experiment reply {i} leaked onto main "
f"(read-side filter regression)"
)
assert f"experiment turn {i}" not in on_main_texts
# 5 user + 5 assistant = 10 rows total on main.
assert len(on_main) == 10
# ---------------------------------------------------------------------------
# 3. Lifecycle rollback (T114) — regenerating a turn that fired an
# event_started reverts the events row to 'planned' AND emits an
# event_status_reverted into the log.
# ---------------------------------------------------------------------------
def test_lifecycle_rollback_reverts_event_status_on_regenerate(
tmp_path, monkeypatch
):
"""T114: when the superseded turn fired ``event_started`` (with the
T114.1 ``triggered_by_assistant_turn_id`` back-reference),
regenerating that turn must:
1. Append an ``event_status_reverted`` event with ``prior_status='planned'``.
2. Project the events row's status back to ``planned``.
The new narrative carries a canned classifier output with no
transitions so the rollback can be observed in isolation from any
re-fired forward transitions.
Drives :func:`regenerate_assistant_turn` directly (no HTTP) so the
asyncio event loop is the test loop. Mirrors the unit-test
pattern in :mod:`tests.test_regenerate`.
"""
from chat.config import Settings
from chat.services.regenerate import regenerate_assistant_turn
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
db = tmp_path / "test.db"
monkeypatch.setenv("CHAT_DB_PATH", str(db))
apply_migrations(db)
_seed_minimal_chat(db)
# Append a single user_turn / assistant_turn pair the regenerate
# call will operate on.
with open_db(db) as conn:
user_turn_id = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "lights up",
"segments": [],
},
)
assistant_turn_id = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "Maya nods.",
"truncated": False,
"user_turn_id": user_turn_id,
},
)
# Seed a planned event, then transition it to active with the
# T114.1 back-reference pointing at the assistant_turn we'll
# regenerate.
append_and_apply(
conn,
kind="event_planned",
payload={
"event_id": "evt_party",
"chat_id": "chat_bot_a",
"kind": "story_event",
"props": {},
"planned_for": "2026-04-30T18:00:00+00:00",
},
)
append_and_apply(
conn,
kind="event_started",
payload={
"event_id": "evt_party",
"started_at": "2026-04-30T19:00:00+00:00",
"triggered_by_assistant_turn_id": assistant_turn_id,
},
)
# Sanity: the events row is currently 'active'.
status_before = conn.execute(
"SELECT status FROM events WHERE event_id = ?",
("evt_party",),
).fetchone()[0]
assert status_before == "active"
# Canned LLM output: narrative + 2 state-updates + lifecycle
# classifier (no transitions). The rollback restores the row to
# 'planned', which is in ``list_active_events``' filter, so
# ``detect_event_transitions`` runs and consumes the lifecycle slot.
state_canned = json.dumps(
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
)
no_transitions = json.dumps({"transitions": []})
mock_client = MockLLMClient(
canned=[
"Maya gestures.", # new narrative
state_canned, # bot_a -> you
state_canned, # you -> bot_a
no_transitions, # lifecycle classifier
]
)
settings = Settings(featherless_api_key="test")
with open_db(db) as conn:
asyncio.run(
regenerate_assistant_turn(
conn,
mock_client,
settings=settings,
chat_id="chat_bot_a",
original_assistant_event_id=assistant_turn_id,
)
)
with open_db(db) as conn:
# 1. The event_status_reverted event lands with prior_status='planned'.
rev_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'event_status_reverted' ORDER BY id"
).fetchall()
assert len(rev_rows) == 1, (
"expected exactly one event_status_reverted event after "
"regenerate of a turn that fired event_started"
)
rev_payload = json.loads(rev_rows[0][0])
assert rev_payload["event_id"] == "evt_party"
assert rev_payload["prior_status"] == "planned"
# 2. The events row is back to 'planned' (rolled back from 'active').
status_after = conn.execute(
"SELECT status FROM events WHERE event_id = ?",
("evt_party",),
).fetchone()[0]
assert status_after == "planned"
# ---------------------------------------------------------------------------
# 4. Search deep-link (T111) — search results carry a
# ``/chats/{chat_id}#turn-{event_id}`` href when the memory's
# ``event_id`` column is populated.
# ---------------------------------------------------------------------------
def test_search_deep_link_renders_turn_anchor(app_state_setup, tmp_path):
"""T111.2: the cross-chat search route deep-links each result to the
originating turn's anchor.
Cross-feature: T109 added ``memories.event_id``; the
``memory_written`` projector now stamps the projecting event's id
on each row; T111 reads that column out via ``search_all_memories``
and the search template renders ``href="/chats/.../#turn-..."``.
Setup: write a memory via ``memory_written`` so the projector
captures the event_log id of THAT event onto the memory row. Then
GET ``/search?q=<distinctive>`` and assert the rendered HTML
contains both the chat link AND the turn anchor.
"""
db = tmp_path / "test.db"
_seed_minimal_chat(db)
distinctive = "wisteriablossom"
with open_db(db) as conn:
memory_event_id = append_and_apply(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"pov_summary": (
f"the {distinctive} bloomed by the gate"
),
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"source": "direct",
"reliability": 1.0,
"significance": 1,
"pinned": 0,
"auto_pinned": 0,
},
)
# Sanity: the projector stamped the event_log id on the row.
stored_event_id = conn.execute(
"SELECT event_id FROM memories WHERE chat_id = ? "
"AND pov_summary LIKE ?",
("chat_bot_a", f"%{distinctive}%"),
).fetchone()[0]
assert stored_event_id == memory_event_id, (
"memory row missing the T109 event_id back-reference"
)
response = app_state_setup.get(f"/search?q={distinctive}")
assert response.status_code == 200
body = response.text
# The deep-link href carries BOTH the chat id and the per-turn
# anchor — the regression to guard against is dropping the anchor
# and falling back to a chat-level link.
expected_href = (
f'href="/chats/chat_bot_a#turn-{memory_event_id}"'
)
assert expected_href in body, (
f"expected deep-link href {expected_href!r} in search response; "
f"body contained: {body!r}"
)
# ---------------------------------------------------------------------------
# 5. Bulk significance re-rate (T110.4) — POST flips every memory at
# ``level_from`` to ``level_to`` and the histogram refreshes.
# ---------------------------------------------------------------------------
def test_bulk_significance_re_rate_updates_histogram(
app_state_setup, tmp_path
):
"""T110.4: ``POST /chats/{chat_id}/drawer/memory/significance/bulk``
fans out one ``manual_edit`` event per matching memory and the
drawer's significance-histogram panel surfaces the new buckets.
Setup: seed 5 memories at significance=0 in the same chat. Sanity-
check the baseline histogram (level 0 = 5, level 2 = 0).
Action: POST ``level_from=0, level_to=2``.
Assert:
* Response 200 (the route returns the refreshed drawer partial).
* 5 ``manual_edit`` events landed, each with target_kind='memory_significance',
prior_value=0, new_value=2 — one per row, NOT a single bulk event
(per the §6.4 audit-trail design).
* All 5 memories in the database now sit at significance=2.
* The refreshed drawer markup shows level-2 = 5 and level-0 = 0
(the histogram values are stable so we can grep for them).
"""
db = tmp_path / "test.db"
_seed_minimal_chat(db)
# Seed 5 memories at significance=0.
with open_db(db) as conn:
for idx in range(5):
append_and_apply(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"pov_summary": f"baseline memory {idx}",
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"source": "direct",
"reliability": 1.0,
"significance": 0, # all start at 0 for the bulk move.
"pinned": 0,
"auto_pinned": 0,
},
)
# Sanity: 5 rows at level 0 going in.
baseline = conn.execute(
"SELECT significance, COUNT(*) FROM memories "
"WHERE chat_id = ? GROUP BY significance",
("chat_bot_a",),
).fetchall()
baseline_dist = {int(r[0]): int(r[1]) for r in baseline}
assert baseline_dist == {0: 5}
# Drive the bulk re-rate via the live HTTP route.
response = app_state_setup.post(
"/chats/chat_bot_a/drawer/memory/significance/bulk",
data={"level_from": "0", "level_to": "2"},
)
assert response.status_code == 200
body = response.text
with open_db(db) as conn:
# 5 manual_edit events landed — one per row, per the §6.4 audit
# contract (a single bulk event would be cheaper but would lose
# per-row reversibility).
edit_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'manual_edit' "
" AND json_extract(payload_json, '$.target_kind') = "
" 'memory_significance' "
"ORDER BY id"
).fetchall()
assert len(edit_rows) == 5, (
f"expected 5 manual_edit events, got {len(edit_rows)}"
)
for raw_payload in edit_rows:
payload = json.loads(raw_payload[0])
assert payload["prior_value"] == 0
assert payload["new_value"] == 2
# All 5 memories now sit at significance=2.
post_dist = {
int(r[0]): int(r[1])
for r in conn.execute(
"SELECT significance, COUNT(*) FROM memories "
"WHERE chat_id = ? GROUP BY significance",
("chat_bot_a",),
).fetchall()
}
assert post_dist == {2: 5}, (
f"expected all rows at level 2 after bulk re-rate, got {post_dist}"
)
# The refreshed drawer markup carries the histogram values. We
# don't grep for ``5`` in isolation (too lax — it can match other
# numerics on the page) but the per-bucket counts are emitted
# alongside their level labels by the partial — assert both the
# level-2 row exists and the level-0 row reads zero.
# The drawer template surfaces ``significance_distribution`` keys
# 0..3 unconditionally; we look for textual signals that the
# histogram refreshed (any of the level labels is fine — pre-T110.4
# the data wasn't changing on this route, post-T110.4 it does).
assert body, "drawer route returned empty body"