from __future__ import annotations import logging from chat.db.connection import open_db from chat.db.migrate import apply_migrations from chat.eventlog.log import append_event from chat.eventlog.projector import project import chat.state.branches # registers handlers from chat.state.branches import active_branch, get_branch, list_branches def test_main_branch_bootstrapped_by_migration(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: active = active_branch(conn) assert active is not None assert active["name"] == "main" assert active["is_active"] is True assert active["origin_event_id"] == 0 assert active["head_event_id"] == 0 def test_branch_created_inserts_row(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: append_event( conn, kind="branch_created", payload={ "name": "experiment", "origin_event_id": 42, "chat_id": "chat_a", }, ) project(conn) b = get_branch(conn, "experiment") assert b is not None assert b["name"] == "experiment" assert b["origin_event_id"] == 42 # head defaults to origin when not specified assert b["head_event_id"] == 42 assert b["chat_id"] == "chat_a" assert b["is_active"] is False # main remains active active = active_branch(conn) assert active is not None assert active["name"] == "main" def test_branch_switched_atomic(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: append_event( conn, kind="branch_created", payload={ "name": "experiment", "origin_event_id": 5, "chat_id": "chat_a", }, ) append_event( conn, kind="branch_switched", payload={"name": "experiment"}, ) project(conn) active = active_branch(conn) assert active is not None assert active["name"] == "experiment" main = get_branch(conn, "main") assert main is not None assert main["is_active"] is False # switch back append_event( conn, kind="branch_switched", payload={"name": "main"}, ) project(conn) active2 = active_branch(conn) assert active2 is not None assert active2["name"] == "main" experiment = get_branch(conn, "experiment") assert experiment is not None assert experiment["is_active"] is False def test_branch_head_updated_changes_head(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: append_event( conn, kind="branch_created", payload={ "name": "experiment", "origin_event_id": 10, "head_event_id": 10, "chat_id": "chat_a", }, ) append_event( conn, kind="branch_head_updated", payload={"name": "experiment", "head_event_id": 20}, ) project(conn) b = get_branch(conn, "experiment") assert b is not None assert b["head_event_id"] == 20 def test_list_branches_returns_all(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: append_event( conn, kind="branch_created", payload={ "name": "experiment", "origin_event_id": 1, "chat_id": "chat_a", }, ) project(conn) names = [b["name"] for b in list_branches(conn)] assert "main" in names assert "experiment" in names def test_branch_switched_unknown_name_warns(tmp_path, caplog): """Switching to a nonexistent branch logs a warning and leaves no branch active. The previous behavior silently cleared is_active flags and applied no UPDATE when the named branch did not exist. T103 makes that condition observable by emitting a warning while preserving the existing (zero-active) outcome. """ db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: with caplog.at_level(logging.WARNING, logger="chat.state.branches"): append_event( conn, kind="branch_switched", payload={"name": "does_not_exist"}, ) project(conn) # A warning was emitted naming the missing branch. warnings = [ r for r in caplog.records if r.levelno == logging.WARNING and r.name == "chat.state.branches" ] assert warnings, "expected a warning for unknown branch name" assert any("does_not_exist" in r.getMessage() for r in warnings) # Existing behavior preserved: no branch is active after the switch. assert active_branch(conn) is None # The unknown name was not inserted as a side effect. assert get_branch(conn, "does_not_exist") is None