"""Tests for the branching service (T94, Phase 4).""" from __future__ import annotations import pytest from chat.db.connection import open_db from chat.db.migrate import apply_migrations from chat.eventlog.log import append_and_apply import chat.state.branches # noqa: F401 registers handlers from chat.services.branching import ( branch_from_event, list_branches_with_metadata, switch_active_branch, ) from chat.state.branches import active_branch, get_branch def _seed_event(conn) -> int: """Append a benign event so we have a real event_log row to fork from. ``user_turn`` is a transcript-only kind with no registered projector handler, so ``append_and_apply`` is a clean no-op on the projector side regardless of what other handlers are imported by the suite. """ return append_and_apply( conn, kind="user_turn", payload={"chat_id": "c1", "text": "hi"}, ) def test_branch_from_event_creates_branch_via_event(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: seed_id = _seed_event(conn) new_id = branch_from_event( conn, name="experiment", origin_event_id=seed_id, chat_id="c1", ) assert isinstance(new_id, int) and new_id > 0 b = get_branch(conn, "experiment") assert b is not None assert b["id"] == new_id assert b["origin_event_id"] == seed_id assert b["head_event_id"] == seed_id assert b["chat_id"] == "c1" assert b["is_active"] is False # branch_created event landed in event_log row = conn.execute( "SELECT COUNT(*) FROM event_log WHERE kind = 'branch_created'" ).fetchone() assert row[0] == 1 def test_branch_from_event_duplicate_name_raises(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: seed_id = _seed_event(conn) branch_from_event(conn, name="dup", origin_event_id=seed_id) with pytest.raises(ValueError, match="already exists"): branch_from_event(conn, name="dup", origin_event_id=seed_id) def test_branch_from_event_invalid_origin_raises(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: with pytest.raises(ValueError, match="does not exist"): branch_from_event(conn, name="ghost", origin_event_id=99999) def test_switch_active_branch_changes_active(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: seed_id = _seed_event(conn) branch_from_event(conn, name="experiment", origin_event_id=seed_id) switch_active_branch(conn, name="experiment") active = active_branch(conn) assert active is not None assert active["name"] == "experiment" # Switch back to main. switch_active_branch(conn, name="main") active2 = active_branch(conn) assert active2 is not None assert active2["name"] == "main" def test_switch_active_branch_unknown_name_raises(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: with pytest.raises(ValueError, match="does not exist"): switch_active_branch(conn, name="nope") def test_list_branches_with_metadata_includes_event_count(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: # Seed enough events to cover origin=10 and head=15. for _ in range(15): _seed_event(conn) # Create the branch at origin=10, then bump its head to 15. branch_from_event(conn, name="exp", origin_event_id=10) append_and_apply( conn, kind="branch_head_updated", payload={"name": "exp", "head_event_id": 15}, ) rows = {b["name"]: b for b in list_branches_with_metadata(conn)} # main: bootstrap state — origin=0, head=0 — event_count == 0. assert rows["main"]["event_count"] == 0 # exp: origin=10, head=15 — event_count == 6 (inclusive). assert rows["exp"]["origin_event_id"] == 10 assert rows["exp"]["head_event_id"] == 15 assert rows["exp"]["event_count"] == 6 # --------------------------------------------------------------------------- # T113 read-side filter — cross-feature tests. # --------------------------------------------------------------------------- # # These exercise the active-branch event-id clamp through every reader # the spec called out: ``read_recent_dialogue`` (turn_common), # ``_read_recent_dialogue`` (scene_summarize), and ``search_memories`` # (memory). They drive the readers via real event-log inserts + branch # switches so the integration is end-to-end. def _seed_user_turn(conn, chat_id: str, prose: str) -> int: return append_and_apply( conn, kind="user_turn", payload={"chat_id": chat_id, "prose": prose, "segments": []}, ) def test_read_recent_dialogue_respects_active_branch_head(tmp_path): """T113 spec test 1: dialogue reader clamps to active branch head. Seed 10 user turns; create a branch with origin=1 + head=5 and switch to it; assert ``read_recent_dialogue`` only returns the first 5 turns. (The 5 events with id 6..10 fall outside ``[1, 5]``.) """ from chat.services.turn_common import read_recent_dialogue db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: ids = [_seed_user_turn(conn, "c1", f"turn {i}") for i in range(10)] # 5 events visible after the switch. branch_from_event( conn, name="halfway", origin_event_id=ids[0], chat_id="c1" ) append_and_apply( conn, kind="branch_head_updated", payload={"name": "halfway", "head_event_id": ids[4]}, ) switch_active_branch(conn, name="halfway") rows = read_recent_dialogue(conn, "c1") # The reader returns oldest-first, so the visible-set is the # first 5 turns. assert len(rows) == 5 assert [r["text"] for r in rows] == [f"turn {i}" for i in range(5)] def test_search_memories_respects_active_branch_head(tmp_path): """T113 spec test 2: memory search clamps to active branch head via ``memories.event_id``. Memories whose projecting event lands outside the clamp drop out of FTS results.""" from chat.eventlog.log import append_and_apply as _aa from chat.state.memory import search_memories db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: # Two memories projected from real events. The projector handler # stamps memories.event_id from the projecting event's id. ev_a = _aa( conn, kind="memory_written", payload={ "owner_id": "host_bot", "chat_id": "c1", "scene_id": 1, "pov_summary": "alpha keyword present", "witness_you": 1, "witness_host": 1, "witness_guest": 0, }, ) ev_b = _aa( conn, kind="memory_written", payload={ "owner_id": "host_bot", "chat_id": "c1", "scene_id": 1, "pov_summary": "alpha keyword present too", "witness_you": 1, "witness_host": 1, "witness_guest": 0, }, ) # Branch clamps to ev_a only (head = ev_a; ev_b sits past head). branch_from_event( conn, name="early", origin_event_id=ev_a, chat_id="c1" ) switch_active_branch(conn, name="early") results = search_memories(conn, "host_bot", "host", "alpha") # Only the first memory should surface — the second's event_id # exceeds the active branch head. ids = [r["event_id"] for r in results] assert ev_a in ids assert ev_b not in ids def test_branch_switch_changes_visible_events(tmp_path): """T113 spec test 3: switching branches mid-flight changes the read immediately. ``read_recent_dialogue`` re-queries on every call.""" from chat.services.turn_common import read_recent_dialogue db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: ids = [_seed_user_turn(conn, "c1", f"turn {i}") for i in range(6)] branch_from_event( conn, name="early", origin_event_id=ids[0], chat_id="c1" ) append_and_apply( conn, kind="branch_head_updated", payload={"name": "early", "head_event_id": ids[2]}, ) branch_from_event( conn, name="late", origin_event_id=ids[3], chat_id="c1" ) append_and_apply( conn, kind="branch_head_updated", payload={"name": "late", "head_event_id": ids[5]}, ) switch_active_branch(conn, name="early") early_rows = [r["text"] for r in read_recent_dialogue(conn, "c1")] assert early_rows == ["turn 0", "turn 1", "turn 2"] switch_active_branch(conn, name="late") late_rows = [r["text"] for r in read_recent_dialogue(conn, "c1")] assert late_rows == ["turn 3", "turn 4", "turn 5"] def test_main_branch_with_head_zero_returns_empty(tmp_path): """T113 spec test 4: a non-main branch with head=0 returns empty. The bootstrap-main sentinel only fires for ``name=="main", origin=0, head=0``. A different branch parked at ``origin=0, head=0`` is not a sentinel and the ``BETWEEN 0 AND 0`` clamp filters out every real event_log row (rowids start at 1).""" from chat.services.turn_common import read_recent_dialogue db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: # Need a real event_log row id 1+ so the clamp's "exclude 0" actually # has something to exclude — otherwise we trivially return []. _seed_user_turn(conn, "c1", "turn 0") # Force-create a branch at origin=0, head=0 (NOT main). This is an # artificial state — production never produces it — but it's the # cleanest way to drive the documented edge case. append_and_apply( conn, kind="branch_created", payload={ "name": "stub", "origin_event_id": 0, "head_event_id": 0, "chat_id": "c1", }, ) switch_active_branch(conn, name="stub") rows = read_recent_dialogue(conn, "c1") assert rows == [] def test_no_active_branch_falls_through_to_all_events(tmp_path): """T113 spec test 5: with no active branch (e.g. a switch to an unknown name cleared all is_active flags), readers see the full log via the ``(0, BIG_INT)`` defensive default.""" from chat.services.turn_common import read_recent_dialogue db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: for i in range(3): _seed_user_turn(conn, "c1", f"turn {i}") # Switching to an unknown branch leaves zero rows with is_active=1. append_and_apply( conn, kind="branch_switched", payload={"name": "missing"}, ) from chat.state.branches import active_branch as _ab assert _ab(conn) is None rows = read_recent_dialogue(conn, "c1") assert [r["text"] for r in rows] == ["turn 0", "turn 1", "turn 2"] def test_scene_summarize_read_recent_dialogue_respects_branch(tmp_path): """T113: ``scene_summarize._read_recent_dialogue`` (the scene-close summary input) also clamps to the active branch range.""" from chat.services.scene_summarize import _read_recent_dialogue db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: ids = [_seed_user_turn(conn, "c1", f"turn {i}") for i in range(6)] branch_from_event( conn, name="early", origin_event_id=ids[0], chat_id="c1" ) append_and_apply( conn, kind="branch_head_updated", payload={"name": "early", "head_event_id": ids[2]}, ) switch_active_branch(conn, name="early") rows = _read_recent_dialogue(conn, "c1") assert [r["text"] for r in rows] == ["turn 0", "turn 1", "turn 2"] def test_meanwhile_dialogue_reader_respects_branch(tmp_path): """T113: meanwhile prompt-context reader also clamps to the active branch. The meanwhile reader filters by ``meanwhile_scene_id``; the branch filter is composed on top of that filter.""" from chat.web.meanwhile import _read_recent_meanwhile_dialogue db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: # Seed user turns + meanwhile assistant turns interleaved so the # branch-id clamp lands across both kinds. u1 = _seed_user_turn(conn, "c1", "u1") a1 = append_and_apply( conn, kind="assistant_turn", payload={ "chat_id": "c1", "speaker_id": "host", "text": "a1", "meanwhile_scene_id": 7, }, ) # Past-head turn should NOT appear once we switch to ``early``. a2 = append_and_apply( conn, kind="assistant_turn", payload={ "chat_id": "c1", "speaker_id": "guest", "text": "a2", "meanwhile_scene_id": 7, }, ) branch_from_event( conn, name="early", origin_event_id=u1, chat_id="c1" ) append_and_apply( conn, kind="branch_head_updated", payload={"name": "early", "head_event_id": a1}, ) switch_active_branch(conn, name="early") rows = _read_recent_meanwhile_dialogue(conn, "c1", scene_id=7) texts = [r["text"] for r in rows] assert "a1" in texts assert "a2" not in texts # Suppress the "unused" linter warning while keeping the binding # readable for the test narrative. _ = a2