Files
chat/chat/state/events.py
T
Joseph Doherty 6d4ad86e33 feat: event_status_reverted event kind + projector handler (T114.2)
Adds the inverse projection used by T114.3's regenerate rollback. The
new ``event_status_reverted`` event kind carries
``{event_id, prior_status}`` and the handler unconditionally sets
``events.status = prior_status`` for the row.

Unlike the forward transitions (event_started / event_completed /
event_cancelled), this handler does NOT guard against terminal
statuses — its entire purpose is to reverse a transition, including
walking back from a terminal status to a non-terminal one. Without
that, rolling back an event_completed (status='completed' is terminal
for the forward handlers) would silently no-op and leave the row in
the post-superseded state.

The handler registers via the existing ``@on(kind)`` decorator pattern
in chat/eventlog/projector.py, so future replays of an event_log that
contains event_status_reverted rows pick it up automatically.

Test exercises completed→active, active→planned, and cancelled→active
round-trips.
2026-04-27 06:39:03 -04:00

151 lines
5.0 KiB
Python

from __future__ import annotations
import json
from sqlite3 import Connection
from chat.eventlog.projector import on
from chat.eventlog.log import Event
_TERMINAL_STATUSES = {"completed", "cancelled", "expired"}
@on("event_planned")
def _apply_event_planned(conn: Connection, e: Event) -> None:
p = e.payload
conn.execute(
"INSERT OR IGNORE INTO events "
"(event_id, chat_id, kind, status, props_json, planned_for) "
"VALUES (?, ?, ?, 'planned', ?, ?)",
(
p["event_id"],
p["chat_id"],
p["kind"],
json.dumps(p.get("props", {})),
p.get("planned_for"),
),
)
@on("event_started")
def _apply_event_started(conn: Connection, e: Event) -> None:
p = e.payload
# Idempotent: only transition from non-terminal status.
conn.execute(
"UPDATE events SET status = 'active', started_at = ?, updated_at = datetime('now') "
"WHERE event_id = ? AND status NOT IN ('completed','cancelled','expired')",
(p.get("started_at"), p["event_id"]),
)
@on("event_completed")
def _apply_event_completed(conn: Connection, e: Event) -> None:
p = e.payload
conn.execute(
"UPDATE events SET status = 'completed', completed_at = ?, updated_at = datetime('now') "
"WHERE event_id = ? AND status NOT IN ('completed','cancelled','expired')",
(p.get("completed_at"), p["event_id"]),
)
@on("event_cancelled")
def _apply_event_cancelled(conn: Connection, e: Event) -> None:
p = e.payload
conn.execute(
"UPDATE events SET status = 'cancelled', completed_at = ?, updated_at = datetime('now') "
"WHERE event_id = ? AND status NOT IN ('completed','cancelled','expired')",
(p.get("completed_at"), p["event_id"]),
)
@on("event_expired")
def _apply_event_expired(conn: Connection, e: Event) -> None:
p = e.payload
conn.execute(
"UPDATE events SET status = 'expired', completed_at = ?, updated_at = datetime('now') "
"WHERE event_id = ? AND status NOT IN ('completed','cancelled','expired')",
(p.get("completed_at"), p["event_id"]),
)
@on("event_status_reverted")
def _apply_event_status_reverted(conn: Connection, e: Event) -> None:
"""T114.2: Revert an event row's status to ``prior_status``.
Emitted by ``regenerate_assistant_turn`` when a superseded turn had
triggered a lifecycle transition (event_started / event_completed /
event_cancelled). The rollback step needs an inverse projection that
sets the row's status back to whatever it was *before* the now-
superseded transition fired.
Unlike the forward transitions (which guard against terminal-status
overwrites) this handler is unconditional — the entire purpose is to
reverse a transition, including reverting from a terminal status
(completed/cancelled) back to a non-terminal one.
"""
p = e.payload
conn.execute(
"UPDATE events SET status = ?, updated_at = datetime('now') "
"WHERE event_id = ?",
(p["prior_status"], p["event_id"]),
)
def get_event(conn: Connection, event_id: str) -> dict | None:
row = conn.execute(
"SELECT event_id, chat_id, kind, status, props_json, planned_for, "
"started_at, completed_at, created_at, updated_at "
"FROM events WHERE event_id = ?",
(event_id,),
).fetchone()
if not row:
return None
return {
"event_id": row[0],
"chat_id": row[1],
"kind": row[2],
"status": row[3],
"props": json.loads(row[4]),
"planned_for": row[5],
"started_at": row[6],
"completed_at": row[7],
"created_at": row[8],
"updated_at": row[9],
}
def list_active_events(conn: Connection, chat_id: str) -> list[dict]:
rows = conn.execute(
"SELECT event_id, chat_id, kind, status, props_json, planned_for, "
"started_at, completed_at, created_at, updated_at "
"FROM events WHERE chat_id = ? AND status IN ('planned','active') "
"ORDER BY id ASC",
(chat_id,),
).fetchall()
return [
{
"event_id": r[0], "chat_id": r[1], "kind": r[2], "status": r[3],
"props": json.loads(r[4]),
"planned_for": r[5], "started_at": r[6], "completed_at": r[7],
"created_at": r[8], "updated_at": r[9],
}
for r in rows
]
def list_events_in_status(conn: Connection, chat_id: str, status: str) -> list[dict]:
rows = conn.execute(
"SELECT event_id, chat_id, kind, status, props_json, planned_for, "
"started_at, completed_at, created_at, updated_at "
"FROM events WHERE chat_id = ? AND status = ? ORDER BY id ASC",
(chat_id, status),
).fetchall()
return [
{
"event_id": r[0], "chat_id": r[1], "kind": r[2], "status": r[3],
"props": json.loads(r[4]),
"planned_for": r[5], "started_at": r[6], "completed_at": r[7],
"created_at": r[8], "updated_at": r[9],
}
for r in rows
]