Files
chat/chat/eventlog/log.py
T
2026-04-26 13:17:07 -04:00

78 lines
2.4 KiB
Python

from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any, Iterator
from sqlite3 import Connection
@dataclass
class Event:
id: int
branch_id: int
ts: str
kind: str
payload: dict[str, Any]
superseded_by: int | None
hidden: bool
def append_event(conn: Connection, *, kind: str, payload: dict[str, Any], branch_id: int = 1) -> int:
cur = conn.execute(
"INSERT INTO event_log (branch_id, kind, payload_json) VALUES (?, ?, ?)",
(branch_id, kind, json.dumps(payload)),
)
return cur.lastrowid
def append_and_apply(
conn: Connection,
*,
kind: str,
payload: dict[str, Any],
branch_id: int = 1,
) -> int:
"""Append an event AND immediately apply just that event's handler.
Calling :func:`chat.eventlog.projector.project` after an append
re-runs every prior event, which is fine for idempotent inserts but
catastrophic for delta-shaped events like ``edge_update`` whose
handler is *not* replay-safe (each pass would re-add the same
``affinity_delta``). This helper runs only the brand-new event
through the registered handler, leaving prior state untouched.
No-ops cleanly when ``kind`` has no registered handler — useful for
transcript-only events like ``user_turn`` / ``assistant_turn`` where
callers may swap ``append_event`` for ``append_and_apply`` without
side effects.
"""
# Local import to avoid a circular dependency at module import: the
# projector imports from .log to define ``Event``.
from chat.eventlog.projector import apply_event
eid = append_event(conn, kind=kind, payload=payload, branch_id=branch_id)
event = Event(
id=eid,
branch_id=branch_id,
ts="",
kind=kind,
payload=payload,
superseded_by=None,
hidden=False,
)
apply_event(conn, event)
return eid
def read_events(conn: Connection, branch_id: int = 1, after_id: int = 0) -> Iterator[Event]:
cur = conn.execute(
"SELECT id, branch_id, ts, kind, payload_json, superseded_by, hidden "
"FROM event_log WHERE branch_id = ? AND id > ? AND hidden = 0 "
"AND superseded_by IS NULL ORDER BY id",
(branch_id, after_id),
)
for row in cur:
yield Event(
id=row[0], branch_id=row[1], ts=row[2], kind=row[3],
payload=json.loads(row[4]), superseded_by=row[5], hidden=bool(row[6]),
)