diff --git a/chat/db/migrations/0010_threads.sql b/chat/db/migrations/0010_threads.sql new file mode 100644 index 0000000..6f61d33 --- /dev/null +++ b/chat/db/migrations/0010_threads.sql @@ -0,0 +1,14 @@ +CREATE TABLE threads ( + id INTEGER PRIMARY KEY, + thread_id TEXT NOT NULL UNIQUE, + chat_id TEXT NOT NULL, + title TEXT NOT NULL, + summary TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'open', + opened_at TEXT NOT NULL DEFAULT (datetime('now')), + closed_at TEXT, + last_referenced_scene_id INTEGER, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) +); +CREATE INDEX threads_chat_status_idx ON threads(chat_id, status); diff --git a/chat/state/threads.py b/chat/state/threads.py new file mode 100644 index 0000000..6aa6407 --- /dev/null +++ b/chat/state/threads.py @@ -0,0 +1,123 @@ +from __future__ import annotations +from sqlite3 import Connection + +from chat.eventlog.projector import on +from chat.eventlog.log import Event + + +@on("thread_opened") +def _apply_thread_opened(conn: Connection, e: Event) -> None: + p = e.payload + conn.execute( + "INSERT OR IGNORE INTO threads " + "(thread_id, chat_id, title, summary, status) " + "VALUES (?, ?, ?, ?, 'open')", + ( + p["thread_id"], + p["chat_id"], + p["title"], + p.get("summary", ""), + ), + ) + + +@on("thread_updated") +def _apply_thread_updated(conn: Connection, e: Event) -> None: + p = e.payload + # Idempotent: closed threads ignore subsequent updates. + conn.execute( + "UPDATE threads SET summary = ?, last_referenced_scene_id = ?, " + "updated_at = datetime('now') " + "WHERE thread_id = ? AND status = 'open'", + ( + p.get("summary", ""), + p.get("last_referenced_scene_id"), + p["thread_id"], + ), + ) + + +@on("thread_closed") +def _apply_thread_closed(conn: Connection, e: Event) -> None: + p = e.payload + conn.execute( + "UPDATE threads SET status = 'closed', closed_at = ?, " + "updated_at = datetime('now') " + "WHERE thread_id = ? AND status = 'open'", + (p.get("closed_at"), p["thread_id"]), + ) + + +def get_thread(conn: Connection, thread_id: str) -> dict | None: + row = conn.execute( + "SELECT thread_id, chat_id, title, summary, status, " + "opened_at, closed_at, last_referenced_scene_id, " + "created_at, updated_at " + "FROM threads WHERE thread_id = ?", + (thread_id,), + ).fetchone() + if not row: + return None + return { + "thread_id": row[0], + "chat_id": row[1], + "title": row[2], + "summary": row[3], + "status": row[4], + "opened_at": row[5], + "closed_at": row[6], + "last_referenced_scene_id": row[7], + "created_at": row[8], + "updated_at": row[9], + } + + +def list_open_threads(conn: Connection, chat_id: str) -> list[dict]: + rows = conn.execute( + "SELECT thread_id, chat_id, title, summary, status, " + "opened_at, closed_at, last_referenced_scene_id, " + "created_at, updated_at " + "FROM threads WHERE chat_id = ? AND status = 'open' " + "ORDER BY id ASC", + (chat_id,), + ).fetchall() + return [ + { + "thread_id": r[0], "chat_id": r[1], "title": r[2], + "summary": r[3], "status": r[4], + "opened_at": r[5], "closed_at": r[6], + "last_referenced_scene_id": r[7], + "created_at": r[8], "updated_at": r[9], + } + for r in rows + ] + + +def list_threads(conn: Connection, chat_id: str, status: str | None = None) -> list[dict]: + if status is None: + rows = conn.execute( + "SELECT thread_id, chat_id, title, summary, status, " + "opened_at, closed_at, last_referenced_scene_id, " + "created_at, updated_at " + "FROM threads WHERE chat_id = ? ORDER BY id ASC", + (chat_id,), + ).fetchall() + else: + rows = conn.execute( + "SELECT thread_id, chat_id, title, summary, status, " + "opened_at, closed_at, last_referenced_scene_id, " + "created_at, updated_at " + "FROM threads WHERE chat_id = ? AND status = ? " + "ORDER BY id ASC", + (chat_id, status), + ).fetchall() + return [ + { + "thread_id": r[0], "chat_id": r[1], "title": r[2], + "summary": r[3], "status": r[4], + "opened_at": r[5], "closed_at": r[6], + "last_referenced_scene_id": r[7], + "created_at": r[8], "updated_at": r[9], + } + for r in rows + ] diff --git a/tests/test_threads_state.py b/tests/test_threads_state.py new file mode 100644 index 0000000..6d6482d --- /dev/null +++ b/tests/test_threads_state.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +from chat.db.connection import open_db +from chat.db.migrate import apply_migrations +from chat.eventlog.log import append_and_apply, append_event +from chat.eventlog.projector import project +import chat.state.entities # registers handlers +import chat.state.world # registers handlers +import chat.state.threads # registers handlers +from chat.state.threads import get_thread, list_open_threads + + +def _bot_payload(bot_id: str, name: str) -> dict: + return { + "id": bot_id, + "name": name, + "persona": "thoughtful, observant", + "voice_samples": [], + "traits": [], + "backstory": "", + "initial_relationship_to_you": "coworker", + "kickoff_prose": "", + } + + +def _chat_payload(chat_id: str = "chat_bot_a") -> dict: + return { + "id": chat_id, + "host_bot_id": "bot_a", + "guest_bot_id": "bot_b", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1 evening", + "weather": "clear", + } + + +def test_thread_opened_creates_row(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) + append_event(conn, kind="chat_created", payload=_chat_payload()) + append_event( + conn, + kind="thread_opened", + payload={ + "thread_id": "thr_abc", + "chat_id": "chat_bot_a", + "title": "Maya's job hunt", + "summary": "Maya is looking for a new job", + }, + ) + project(conn) + + t = get_thread(conn, "thr_abc") + assert t is not None + assert t["thread_id"] == "thr_abc" + assert t["chat_id"] == "chat_bot_a" + assert t["title"] == "Maya's job hunt" + assert t["summary"] == "Maya is looking for a new job" + assert t["status"] == "open" + assert t["closed_at"] is None + assert t["last_referenced_scene_id"] is None + + +def test_thread_updated_changes_summary_and_last_referenced(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) + append_event(conn, kind="chat_created", payload=_chat_payload()) + append_event( + conn, + kind="thread_opened", + payload={ + "thread_id": "thr_abc", + "chat_id": "chat_bot_a", + "title": "Maya's job hunt", + "summary": "Maya is looking for a new job", + }, + ) + append_event( + conn, + kind="thread_updated", + payload={ + "thread_id": "thr_abc", + "summary": "Maya landed an interview at a startup", + "last_referenced_scene_id": 42, + }, + ) + project(conn) + + t = get_thread(conn, "thr_abc") + assert t is not None + assert t["summary"] == "Maya landed an interview at a startup" + assert t["last_referenced_scene_id"] == 42 + assert t["status"] == "open" + + +def test_thread_closed_terminal(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) + append_event(conn, kind="chat_created", payload=_chat_payload()) + append_event( + conn, + kind="thread_opened", + payload={ + "thread_id": "thr_abc", + "chat_id": "chat_bot_a", + "title": "Maya's job hunt", + "summary": "Maya is looking for a new job", + }, + ) + append_event( + conn, + kind="thread_closed", + payload={ + "thread_id": "thr_abc", + "closed_at": "2026-04-26T21:00:00+00:00", + }, + ) + project(conn) + + t = get_thread(conn, "thr_abc") + assert t is not None + assert t["status"] == "closed" + assert t["closed_at"] == "2026-04-26T21:00:00+00:00" + + # Subsequent updates to a closed thread are no-ops. + append_and_apply( + conn, + kind="thread_updated", + payload={ + "thread_id": "thr_abc", + "summary": "should not be applied", + }, + ) + + t2 = get_thread(conn, "thr_abc") + assert t2 is not None + assert t2["summary"] == "Maya is looking for a new job" + assert t2["status"] == "closed" + + +def test_list_open_threads_filters_to_open_only(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) + append_event(conn, kind="chat_created", payload=_chat_payload()) + for tid, title in [ + ("thr_1", "Arc 1"), + ("thr_2", "Arc 2"), + ("thr_3", "Arc 3"), + ]: + append_event( + conn, + kind="thread_opened", + payload={ + "thread_id": tid, + "chat_id": "chat_bot_a", + "title": title, + "summary": "", + }, + ) + append_event( + conn, + kind="thread_closed", + payload={ + "thread_id": "thr_3", + "closed_at": "2026-04-26T21:00:00+00:00", + }, + ) + project(conn) + + open_threads = list_open_threads(conn, "chat_bot_a") + assert len(open_threads) == 2 + ids = {t["thread_id"] for t in open_threads} + assert ids == {"thr_1", "thr_2"}