Files
chat/tests/test_drawer_phase4.py
T
Joseph Doherty f3827706df fix: drawer delete_turn guards event_id <= 0 (T110.1)
A stale tab or hand-crafted request posting event_id=0 to the surgical
delete route would compute after_event_id=-1 and silently truncate the
entire log. Now rejected with 400.

SQLite assigns event_log ids starting at 1, so any legitimate id is
always >= 1 — non-positive values can only indicate a client bug.

Test: tests/test_drawer_phase4.py::test_delete_turn_with_event_id_zero_returns_400.
2026-04-27 05:11:39 -04:00

547 lines
18 KiB
Python

"""T98 (Phase 4): drawer phase-4 bundle.
Five sub-features extending the chat drawer:
* T98.1 — branching UI (create / switch / from-turn).
* T98.2 — significance-review panel (distribution + significance edits).
* T98.3 — hide-from-view toggle (per-turn, via ``manual_edit`` projector
branch ``turn_hidden``).
* T98.4 — surgical delete with cascade preview (preview modal +
rewind execution against a target turn).
* T98.5 — remaining v1 edits (chat narrative_anchor + weather).
Tests follow the T59 pattern in ``tests/test_drawer_events_threads_skip.py``
— a TestClient against the real FastAPI app with a per-test temp DB.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from chat.app import app
from chat.db.connection import open_db
from chat.eventlog.log import append_and_apply, append_event
from chat.eventlog.projector import project
@pytest.fixture
def client(tmp_path, monkeypatch):
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
db = tmp_path / "test.db"
monkeypatch.setenv("CHAT_DB_PATH", str(db))
with TestClient(app) as c:
if hasattr(app.state, "background_worker"):
app.state.background_worker.enabled = False
yield c
def _bot_payload(bot_id: str, name: str) -> dict:
return {
"id": bot_id,
"name": name,
"persona": "...",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "",
}
def _seed_chat(db: Path, *, with_scene: bool = True) -> int:
"""Seed a chat hosted by ``bot_a``; return the latest event id (chat_created)."""
with open_db(db) as conn:
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(
conn,
kind="you_authored",
payload={"name": "Me", "pronouns": "they/them", "persona": ""},
)
chat_event_id = append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
if with_scene:
append_event(
conn,
kind="scene_opened",
payload={
"chat_id": "chat_bot_a",
"container_id": None,
"started_at": "2026-04-26T20:00:00+00:00",
"participants": ["you", "bot_a"],
},
)
project(conn)
return chat_event_id
# ---------------------------------------------------------------------------
# T98.1 — branching UI.
# ---------------------------------------------------------------------------
def test_t98_1_create_branch_emits_branch_created_and_renders(client, tmp_path):
db = tmp_path / "test.db"
seed_id = _seed_chat(db)
response = client.post(
"/chats/chat_bot_a/drawer/branch/create",
data={"name": "experiment_a", "origin_event_id": str(seed_id)},
)
assert response.status_code == 200
with open_db(db) as conn:
rows = conn.execute(
"SELECT COUNT(*) FROM event_log WHERE kind = 'branch_created'"
).fetchone()
assert rows[0] == 1
from chat.state.branches import get_branch
b = get_branch(conn, "experiment_a")
assert b is not None
assert b["origin_event_id"] == seed_id
assert b["chat_id"] == "chat_bot_a"
# Drawer partial lists the new branch.
body = response.text
assert "<h3>Branches</h3>" in body
assert "experiment_a" in body
def test_t98_1_switch_branch_marks_active_and_unknown_400s(client, tmp_path):
db = tmp_path / "test.db"
seed_id = _seed_chat(db)
# Create branch directly via the service so this test focuses on switch.
with open_db(db) as conn:
from chat.services.branching import branch_from_event
branch_from_event(
conn, name="experiment_b", origin_event_id=seed_id, chat_id="chat_bot_a"
)
response = client.post(
"/chats/chat_bot_a/drawer/branch/switch",
data={"name": "experiment_b"},
)
assert response.status_code == 200
with open_db(db) as conn:
from chat.state.branches import active_branch
active = active_branch(conn)
assert active is not None
assert active["name"] == "experiment_b"
# Unknown branch -> 400.
bad = client.post(
"/chats/chat_bot_a/drawer/branch/switch",
data={"name": "ghost_branch"},
)
assert bad.status_code == 400
def test_t98_1_branch_from_turn_emits_branch_created(client, tmp_path):
db = tmp_path / "test.db"
seed_id = _seed_chat(db)
# Append an extra turn so we can branch from it specifically.
with open_db(db) as conn:
turn_id = append_event(
conn,
kind="user_turn",
payload={"chat_id": "chat_bot_a", "prose": "hi", "segments": []},
)
response = client.post(
f"/chats/chat_bot_a/drawer/branch/from-turn/{turn_id}",
data={"name": "fork_at_turn"},
)
assert response.status_code == 200
with open_db(db) as conn:
from chat.state.branches import get_branch
b = get_branch(conn, "fork_at_turn")
assert b is not None
assert b["origin_event_id"] == turn_id
assert b["chat_id"] == "chat_bot_a"
# Duplicate name -> 400 from service ValueError.
dup = client.post(
f"/chats/chat_bot_a/drawer/branch/from-turn/{turn_id}",
data={"name": "fork_at_turn"},
)
assert dup.status_code == 400
assert seed_id < turn_id # sanity: turn is after chat_created
# ---------------------------------------------------------------------------
# T98.2 — significance review panel.
# ---------------------------------------------------------------------------
def _seed_memories_for_significance(db: Path) -> list[int]:
"""Seed three memories with significance levels 0, 1, 2. Returns ids.
Uses ``append_and_apply`` (vs ``append_event`` + a final ``project``)
so each row is applied exactly once — the chat row was already
materialised by ``_seed_chat`` and a re-projection would conflict
on ``chats.id`` UNIQUE.
"""
ids: list[int] = []
with open_db(db) as conn:
for sig in (0, 1, 2):
append_and_apply(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"pov_summary": f"memory at significance {sig}",
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"significance": sig,
},
)
rows = conn.execute(
"SELECT id FROM memories WHERE chat_id = 'chat_bot_a' "
"ORDER BY id ASC"
).fetchall()
ids = [int(r[0]) for r in rows]
return ids
def test_t98_2_distribution_renders_per_significance_bucket(client, tmp_path):
db = tmp_path / "test.db"
_seed_chat(db)
_seed_memories_for_significance(db)
response = client.get("/chats/chat_bot_a/drawer")
assert response.status_code == 200
body = response.text
# Section heading + bar entries for each significance level.
assert "<h3>Significance review</h3>" in body
# All four buckets appear by their canonical label even when count=0.
assert ">★★ (3)<" in body or "(3)" in body
# The distribution markup names each level explicitly.
for level in (0, 1, 2, 3):
assert f"sig-bar sig-{level}" in body
# Three seeded memories (sigs 0, 1, 2) — each has a count = 1 bar.
# We don't pin exact text formatting, just verify the per-level bars
# are present.
def test_t98_2_edit_significance_via_existing_route_lands_manual_edit(
client, tmp_path
):
db = tmp_path / "test.db"
_seed_chat(db)
ids = _seed_memories_for_significance(db)
target_id = ids[0] # initially significance=0
response = client.post(
f"/chats/chat_bot_a/drawer/memory/{target_id}/significance",
data={"significance": "3"},
)
assert response.status_code == 200
with open_db(db) as conn:
# Significance updated in the projected table.
row = conn.execute(
"SELECT significance FROM memories WHERE id = ?", (target_id,)
).fetchone()
assert int(row[0]) == 3
# manual_edit landed in the event log with the prior snapshot.
import json as _json
log_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'manual_edit' ORDER BY id DESC LIMIT 1"
).fetchone()
payload = _json.loads(log_rows[0])
assert payload["target_kind"] == "memory_significance"
assert int(payload["target_id"]) == target_id
assert payload["prior_value"] == 0
assert payload["new_value"] == 3
# ---------------------------------------------------------------------------
# T98.3 — hide-from-view toggle.
# ---------------------------------------------------------------------------
def _seed_turns(db: Path) -> tuple[int, int]:
"""Append one user_turn + one assistant_turn; return their event ids."""
with open_db(db) as conn:
user_id = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "How are you doing today?",
"segments": [],
},
)
bot_id = append_and_apply(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "Quite well, thanks for asking!",
"truncated": False,
"user_turn_id": user_id,
},
)
return user_id, bot_id
def test_t98_3_hide_turn_flips_event_log_hidden_via_manual_edit(
client, tmp_path
):
db = tmp_path / "test.db"
_seed_chat(db)
user_id, bot_id = _seed_turns(db)
response = client.post(
f"/chats/chat_bot_a/drawer/turn/hide/{user_id}",
data={"hidden": "1"},
)
assert response.status_code == 200
with open_db(db) as conn:
# event_log.hidden flipped to 1.
row = conn.execute(
"SELECT hidden FROM event_log WHERE id = ?", (user_id,)
).fetchone()
assert int(row[0]) == 1
# manual_edit landed with the prior snapshot.
import json as _json
log = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'manual_edit' ORDER BY id DESC LIMIT 1"
).fetchone()
payload = _json.loads(log[0])
assert payload["target_kind"] == "turn_hidden"
assert int(payload["target_id"]) == user_id
assert payload["prior_value"] == {"hidden": 0}
assert payload["new_value"] == {"hidden": 1}
def test_t98_3_hidden_turn_disappears_from_read_recent_dialogue(
client, tmp_path
):
"""Hiding a turn must drop it from the prompt-window read.
``read_recent_dialogue`` (chat.services.turn_common) filters
``hidden = 0`` server-side, so flipping the flag via the drawer
route must surface immediately.
"""
db = tmp_path / "test.db"
_seed_chat(db)
user_id, bot_id = _seed_turns(db)
# Sanity baseline — both turns visible before the hide.
with open_db(db) as conn:
from chat.services.turn_common import read_recent_dialogue
before = read_recent_dialogue(conn, "chat_bot_a", limit=10)
before_ids = [t["event_id"] for t in before]
assert user_id in before_ids
assert bot_id in before_ids
# Hide the user turn via the drawer route.
response = client.post(
f"/chats/chat_bot_a/drawer/turn/hide/{user_id}",
data={"hidden": "1"},
)
assert response.status_code == 200
with open_db(db) as conn:
from chat.services.turn_common import read_recent_dialogue
after = read_recent_dialogue(conn, "chat_bot_a", limit=10)
after_ids = [t["event_id"] for t in after]
assert user_id not in after_ids
assert bot_id in after_ids # the unhidden bot turn still surfaces
# ---------------------------------------------------------------------------
# T98.4 — surgical delete with cascade preview.
# ---------------------------------------------------------------------------
def test_t98_4_delete_preview_returns_impact_report_html(client, tmp_path):
db = tmp_path / "test.db"
_seed_chat(db)
user_id, bot_id = _seed_turns(db)
response = client.get(
f"/chats/chat_bot_a/drawer/turn/delete-preview/{user_id}"
)
assert response.status_code == 200
body = response.text
# Modal markup with the event id and the cascade list.
assert "delete-impact-modal" in body
assert f"Delete event {user_id}?" in body
assert "delete-impact-cascade" in body
# Both turns ride along in the cascade — user_turn at user_id, then
# the assistant_turn at bot_id (>= user_id).
assert "user_turn" in body
assert "assistant_turn" in body
# Confirm-form posts to the delete route.
assert f"/drawer/turn/delete/{user_id}" in body
def test_t98_4_delete_invokes_rewind_and_drops_cascade(client, tmp_path):
db = tmp_path / "test.db"
_seed_chat(db)
user_id, bot_id = _seed_turns(db)
# Append a third turn after the assistant_turn so we can verify the
# cascade catches everything from user_id forward.
with open_db(db) as conn:
extra_id = append_and_apply(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "follow-up",
"segments": [],
},
)
# Sanity: all three turn rows exist.
with open_db(db) as conn:
turn_count = conn.execute(
"SELECT COUNT(*) FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn')"
).fetchone()[0]
assert turn_count == 3
# Delete from user_id forward.
response = client.post(f"/chats/chat_bot_a/drawer/turn/delete/{user_id}")
assert response.status_code == 200
# All three turns are gone — the rewind truncated the log past
# user_id - 1, removing user_id, bot_id, and extra_id.
with open_db(db) as conn:
turn_count = conn.execute(
"SELECT COUNT(*) FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn')"
).fetchone()[0]
assert turn_count == 0
for ev_id in (user_id, bot_id, extra_id):
row = conn.execute(
"SELECT 1 FROM event_log WHERE id = ?", (ev_id,)
).fetchone()
assert row is None, f"event {ev_id} should have been deleted"
def test_delete_turn_with_event_id_zero_returns_400(client, tmp_path):
"""T110.1: ``event_id <= 0`` is an obvious client error and must NOT
silently rewind the entire log via ``after_event_id = -1``. The route
rejects it with 400 so the audit trail stays intact.
"""
db = tmp_path / "test.db"
_seed_chat(db)
_seed_turns(db)
# Sanity: events present before the bad request.
with open_db(db) as conn:
before = conn.execute("SELECT COUNT(*) FROM event_log").fetchone()[0]
assert before > 0
response = client.post("/chats/chat_bot_a/drawer/turn/delete/0")
assert response.status_code == 400
# And the log was NOT truncated.
with open_db(db) as conn:
after = conn.execute("SELECT COUNT(*) FROM event_log").fetchone()[0]
assert after == before
# ---------------------------------------------------------------------------
# T98.5 — remaining v1 edits (chat narrative anchor + weather).
# ---------------------------------------------------------------------------
def test_t98_5_edit_chat_narrative_anchor_emits_manual_edit(client, tmp_path):
db = tmp_path / "test.db"
_seed_chat(db)
response = client.post(
"/chats/chat_bot_a/drawer/chat/narrative-anchor",
data={"new_value": "Late evening, after dinner"},
)
assert response.status_code == 200
with open_db(db) as conn:
row = conn.execute(
"SELECT narrative_anchor FROM chat_state WHERE chat_id = ?",
("chat_bot_a",),
).fetchone()
assert row[0] == "Late evening, after dinner"
import json as _json
log = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'manual_edit' ORDER BY id DESC LIMIT 1"
).fetchone()
payload = _json.loads(log[0])
assert payload["target_kind"] == "chat_narrative_anchor"
assert payload["target_id"] == "chat_bot_a"
assert payload["prior_value"] == "Day 1"
assert payload["new_value"] == "Late evening, after dinner"
def test_t98_5_edit_chat_weather_emits_manual_edit(client, tmp_path):
db = tmp_path / "test.db"
_seed_chat(db)
response = client.post(
"/chats/chat_bot_a/drawer/chat/weather",
data={"new_value": "thunderstorm rolling in"},
)
assert response.status_code == 200
with open_db(db) as conn:
row = conn.execute(
"SELECT weather FROM chat_state WHERE chat_id = ?",
("chat_bot_a",),
).fetchone()
assert row[0] == "thunderstorm rolling in"
import json as _json
log = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'manual_edit' ORDER BY id DESC LIMIT 1"
).fetchone()
payload = _json.loads(log[0])
assert payload["target_kind"] == "chat_weather"
assert payload["target_id"] == "chat_bot_a"
assert payload["prior_value"] == ""
assert payload["new_value"] == "thunderstorm rolling in"