merge: T89 branches table + projector handlers

This commit is contained in:
Joseph Doherty
2026-04-27 02:27:48 -04:00
3 changed files with 291 additions and 0 deletions
+17
View File
@@ -0,0 +1,17 @@
CREATE TABLE branches (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
origin_event_id INTEGER NOT NULL,
head_event_id INTEGER NOT NULL,
chat_id TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
is_active INTEGER NOT NULL DEFAULT 0
);
-- Exactly one row may have is_active = 1 at any time.
CREATE UNIQUE INDEX branches_active_idx ON branches(is_active) WHERE is_active = 1;
-- Bootstrap the main branch. origin_event_id=0 + head_event_id=0 are
-- placeholder seeds; the orchestrator updates head as new events land.
INSERT INTO branches (name, origin_event_id, head_event_id, is_active)
VALUES ('main', 0, 0, 1);
+133
View File
@@ -0,0 +1,133 @@
"""Branches projector + readers (T89, Phase 4).
A branch is a named fork of the event log. The 'main' branch is bootstrapped
by migration 0013 with is_active=1. Subsequent branches reference an
origin_event_id (the event they forked from). Phase 4 enables creation
and switching; the read-side filter (event readers consulting is_active)
is a Phase 4.5 follow-up — for now branches are metadata-only and the
existing event readers remain branch-agnostic.
"""
from __future__ import annotations
from sqlite3 import Connection
from chat.eventlog.projector import on
from chat.eventlog.log import Event
@on("branch_created")
def _apply_branch_created(conn: Connection, e: Event) -> None:
"""Insert a new branch row with is_active=0. Idempotent via INSERT OR IGNORE."""
p = e.payload
conn.execute(
"INSERT OR IGNORE INTO branches "
"(name, origin_event_id, head_event_id, chat_id, is_active) "
"VALUES (?, ?, ?, ?, 0)",
(
p["name"],
int(p["origin_event_id"]),
int(p.get("head_event_id", p["origin_event_id"])),
p.get("chat_id"),
),
)
@on("branch_switched")
def _apply_branch_switched(conn: Connection, e: Event) -> None:
"""Set is_active=1 on the named branch and is_active=0 on all others.
Atomic via two UPDATEs ordered to avoid the unique-active-index race.
"""
p = e.payload
name = p["name"]
# Clear ALL is_active flags first (avoids the unique-index trip).
conn.execute("UPDATE branches SET is_active = 0 WHERE is_active = 1")
conn.execute(
"UPDATE branches SET is_active = 1 WHERE name = ?",
(name,),
)
@on("branch_head_updated")
def _apply_branch_head_updated(conn: Connection, e: Event) -> None:
"""Update head_event_id on the named branch."""
p = e.payload
conn.execute(
"UPDATE branches SET head_event_id = ? WHERE name = ?",
(int(p["head_event_id"]), p["name"]),
)
def get_branch(conn: Connection, name: str) -> dict | None:
row = conn.execute(
"SELECT id, name, origin_event_id, head_event_id, chat_id, "
" created_at, is_active "
"FROM branches WHERE name = ?",
(name,),
).fetchone()
if not row:
return None
return {
"id": row[0],
"name": row[1],
"origin_event_id": row[2],
"head_event_id": row[3],
"chat_id": row[4],
"created_at": row[5],
"is_active": bool(row[6]),
}
def list_branches(conn: Connection, chat_id: str | None = None) -> list[dict]:
if chat_id is None:
rows = conn.execute(
"SELECT id, name, origin_event_id, head_event_id, chat_id, "
" created_at, is_active "
"FROM branches ORDER BY id ASC"
).fetchall()
else:
rows = conn.execute(
"SELECT id, name, origin_event_id, head_event_id, chat_id, "
" created_at, is_active "
"FROM branches WHERE chat_id = ? OR chat_id IS NULL "
"ORDER BY id ASC",
(chat_id,),
).fetchall()
return [
{
"id": r[0],
"name": r[1],
"origin_event_id": r[2],
"head_event_id": r[3],
"chat_id": r[4],
"created_at": r[5],
"is_active": bool(r[6]),
}
for r in rows
]
def active_branch(conn: Connection) -> dict | None:
row = conn.execute(
"SELECT id, name, origin_event_id, head_event_id, chat_id, "
" created_at, is_active "
"FROM branches WHERE is_active = 1"
).fetchone()
if not row:
return None
return {
"id": row[0],
"name": row[1],
"origin_event_id": row[2],
"head_event_id": row[3],
"chat_id": row[4],
"created_at": row[5],
"is_active": bool(row[6]),
}
__all__ = [
"get_branch",
"list_branches",
"active_branch",
]
+141
View File
@@ -0,0 +1,141 @@
from __future__ import annotations
from chat.db.connection import open_db
from chat.db.migrate import apply_migrations
from chat.eventlog.log import append_event
from chat.eventlog.projector import project
import chat.state.branches # registers handlers
from chat.state.branches import active_branch, get_branch, list_branches
def test_main_branch_bootstrapped_by_migration(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
active = active_branch(conn)
assert active is not None
assert active["name"] == "main"
assert active["is_active"] is True
assert active["origin_event_id"] == 0
assert active["head_event_id"] == 0
def test_branch_created_inserts_row(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
append_event(
conn,
kind="branch_created",
payload={
"name": "experiment",
"origin_event_id": 42,
"chat_id": "chat_a",
},
)
project(conn)
b = get_branch(conn, "experiment")
assert b is not None
assert b["name"] == "experiment"
assert b["origin_event_id"] == 42
# head defaults to origin when not specified
assert b["head_event_id"] == 42
assert b["chat_id"] == "chat_a"
assert b["is_active"] is False
# main remains active
active = active_branch(conn)
assert active is not None
assert active["name"] == "main"
def test_branch_switched_atomic(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
append_event(
conn,
kind="branch_created",
payload={
"name": "experiment",
"origin_event_id": 5,
"chat_id": "chat_a",
},
)
append_event(
conn,
kind="branch_switched",
payload={"name": "experiment"},
)
project(conn)
active = active_branch(conn)
assert active is not None
assert active["name"] == "experiment"
main = get_branch(conn, "main")
assert main is not None
assert main["is_active"] is False
# switch back
append_event(
conn,
kind="branch_switched",
payload={"name": "main"},
)
project(conn)
active2 = active_branch(conn)
assert active2 is not None
assert active2["name"] == "main"
experiment = get_branch(conn, "experiment")
assert experiment is not None
assert experiment["is_active"] is False
def test_branch_head_updated_changes_head(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
append_event(
conn,
kind="branch_created",
payload={
"name": "experiment",
"origin_event_id": 10,
"head_event_id": 10,
"chat_id": "chat_a",
},
)
append_event(
conn,
kind="branch_head_updated",
payload={"name": "experiment", "head_event_id": 20},
)
project(conn)
b = get_branch(conn, "experiment")
assert b is not None
assert b["head_event_id"] == 20
def test_list_branches_returns_all(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
append_event(
conn,
kind="branch_created",
payload={
"name": "experiment",
"origin_event_id": 1,
"chat_id": "chat_a",
},
)
project(conn)
names = [b["name"] for b in list_branches(conn)]
assert "main" in names
assert "experiment" in names