merge: T51 threads table + projector handlers

This commit is contained in:
Joseph Doherty
2026-04-26 20:06:45 -04:00
3 changed files with 318 additions and 0 deletions
+14
View File
@@ -0,0 +1,14 @@
CREATE TABLE threads (
id INTEGER PRIMARY KEY,
thread_id TEXT NOT NULL UNIQUE,
chat_id TEXT NOT NULL,
title TEXT NOT NULL,
summary TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'open',
opened_at TEXT NOT NULL DEFAULT (datetime('now')),
closed_at TEXT,
last_referenced_scene_id INTEGER,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX threads_chat_status_idx ON threads(chat_id, status);
+123
View File
@@ -0,0 +1,123 @@
from __future__ import annotations
from sqlite3 import Connection
from chat.eventlog.projector import on
from chat.eventlog.log import Event
@on("thread_opened")
def _apply_thread_opened(conn: Connection, e: Event) -> None:
p = e.payload
conn.execute(
"INSERT OR IGNORE INTO threads "
"(thread_id, chat_id, title, summary, status) "
"VALUES (?, ?, ?, ?, 'open')",
(
p["thread_id"],
p["chat_id"],
p["title"],
p.get("summary", ""),
),
)
@on("thread_updated")
def _apply_thread_updated(conn: Connection, e: Event) -> None:
p = e.payload
# Idempotent: closed threads ignore subsequent updates.
conn.execute(
"UPDATE threads SET summary = ?, last_referenced_scene_id = ?, "
"updated_at = datetime('now') "
"WHERE thread_id = ? AND status = 'open'",
(
p.get("summary", ""),
p.get("last_referenced_scene_id"),
p["thread_id"],
),
)
@on("thread_closed")
def _apply_thread_closed(conn: Connection, e: Event) -> None:
p = e.payload
conn.execute(
"UPDATE threads SET status = 'closed', closed_at = ?, "
"updated_at = datetime('now') "
"WHERE thread_id = ? AND status = 'open'",
(p.get("closed_at"), p["thread_id"]),
)
def get_thread(conn: Connection, thread_id: str) -> dict | None:
row = conn.execute(
"SELECT thread_id, chat_id, title, summary, status, "
"opened_at, closed_at, last_referenced_scene_id, "
"created_at, updated_at "
"FROM threads WHERE thread_id = ?",
(thread_id,),
).fetchone()
if not row:
return None
return {
"thread_id": row[0],
"chat_id": row[1],
"title": row[2],
"summary": row[3],
"status": row[4],
"opened_at": row[5],
"closed_at": row[6],
"last_referenced_scene_id": row[7],
"created_at": row[8],
"updated_at": row[9],
}
def list_open_threads(conn: Connection, chat_id: str) -> list[dict]:
rows = conn.execute(
"SELECT thread_id, chat_id, title, summary, status, "
"opened_at, closed_at, last_referenced_scene_id, "
"created_at, updated_at "
"FROM threads WHERE chat_id = ? AND status = 'open' "
"ORDER BY id ASC",
(chat_id,),
).fetchall()
return [
{
"thread_id": r[0], "chat_id": r[1], "title": r[2],
"summary": r[3], "status": r[4],
"opened_at": r[5], "closed_at": r[6],
"last_referenced_scene_id": r[7],
"created_at": r[8], "updated_at": r[9],
}
for r in rows
]
def list_threads(conn: Connection, chat_id: str, status: str | None = None) -> list[dict]:
if status is None:
rows = conn.execute(
"SELECT thread_id, chat_id, title, summary, status, "
"opened_at, closed_at, last_referenced_scene_id, "
"created_at, updated_at "
"FROM threads WHERE chat_id = ? ORDER BY id ASC",
(chat_id,),
).fetchall()
else:
rows = conn.execute(
"SELECT thread_id, chat_id, title, summary, status, "
"opened_at, closed_at, last_referenced_scene_id, "
"created_at, updated_at "
"FROM threads WHERE chat_id = ? AND status = ? "
"ORDER BY id ASC",
(chat_id, status),
).fetchall()
return [
{
"thread_id": r[0], "chat_id": r[1], "title": r[2],
"summary": r[3], "status": r[4],
"opened_at": r[5], "closed_at": r[6],
"last_referenced_scene_id": r[7],
"created_at": r[8], "updated_at": r[9],
}
for r in rows
]
+181
View File
@@ -0,0 +1,181 @@
from __future__ import annotations
from chat.db.connection import open_db
from chat.db.migrate import apply_migrations
from chat.eventlog.log import append_and_apply, append_event
from chat.eventlog.projector import project
import chat.state.entities # registers handlers
import chat.state.world # registers handlers
import chat.state.threads # registers handlers
from chat.state.threads import get_thread, list_open_threads
def _bot_payload(bot_id: str, name: str) -> dict:
return {
"id": bot_id,
"name": name,
"persona": "thoughtful, observant",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "coworker",
"kickoff_prose": "",
}
def _chat_payload(chat_id: str = "chat_bot_a") -> dict:
return {
"id": chat_id,
"host_bot_id": "bot_a",
"guest_bot_id": "bot_b",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1 evening",
"weather": "clear",
}
def test_thread_opened_creates_row(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(conn, kind="chat_created", payload=_chat_payload())
append_event(
conn,
kind="thread_opened",
payload={
"thread_id": "thr_abc",
"chat_id": "chat_bot_a",
"title": "Maya's job hunt",
"summary": "Maya is looking for a new job",
},
)
project(conn)
t = get_thread(conn, "thr_abc")
assert t is not None
assert t["thread_id"] == "thr_abc"
assert t["chat_id"] == "chat_bot_a"
assert t["title"] == "Maya's job hunt"
assert t["summary"] == "Maya is looking for a new job"
assert t["status"] == "open"
assert t["closed_at"] is None
assert t["last_referenced_scene_id"] is None
def test_thread_updated_changes_summary_and_last_referenced(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(conn, kind="chat_created", payload=_chat_payload())
append_event(
conn,
kind="thread_opened",
payload={
"thread_id": "thr_abc",
"chat_id": "chat_bot_a",
"title": "Maya's job hunt",
"summary": "Maya is looking for a new job",
},
)
append_event(
conn,
kind="thread_updated",
payload={
"thread_id": "thr_abc",
"summary": "Maya landed an interview at a startup",
"last_referenced_scene_id": 42,
},
)
project(conn)
t = get_thread(conn, "thr_abc")
assert t is not None
assert t["summary"] == "Maya landed an interview at a startup"
assert t["last_referenced_scene_id"] == 42
assert t["status"] == "open"
def test_thread_closed_terminal(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(conn, kind="chat_created", payload=_chat_payload())
append_event(
conn,
kind="thread_opened",
payload={
"thread_id": "thr_abc",
"chat_id": "chat_bot_a",
"title": "Maya's job hunt",
"summary": "Maya is looking for a new job",
},
)
append_event(
conn,
kind="thread_closed",
payload={
"thread_id": "thr_abc",
"closed_at": "2026-04-26T21:00:00+00:00",
},
)
project(conn)
t = get_thread(conn, "thr_abc")
assert t is not None
assert t["status"] == "closed"
assert t["closed_at"] == "2026-04-26T21:00:00+00:00"
# Subsequent updates to a closed thread are no-ops.
append_and_apply(
conn,
kind="thread_updated",
payload={
"thread_id": "thr_abc",
"summary": "should not be applied",
},
)
t2 = get_thread(conn, "thr_abc")
assert t2 is not None
assert t2["summary"] == "Maya is looking for a new job"
assert t2["status"] == "closed"
def test_list_open_threads_filters_to_open_only(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(conn, kind="chat_created", payload=_chat_payload())
for tid, title in [
("thr_1", "Arc 1"),
("thr_2", "Arc 2"),
("thr_3", "Arc 3"),
]:
append_event(
conn,
kind="thread_opened",
payload={
"thread_id": tid,
"chat_id": "chat_bot_a",
"title": title,
"summary": "",
},
)
append_event(
conn,
kind="thread_closed",
payload={
"thread_id": "thr_3",
"closed_at": "2026-04-26T21:00:00+00:00",
},
)
project(conn)
open_threads = list_open_threads(conn, "chat_bot_a")
assert len(open_threads) == 2
ids = {t["thread_id"] for t in open_threads}
assert ids == {"thr_1", "thr_2"}