feat: append-only event log with projector skeleton
This commit is contained in:
@@ -0,0 +1,10 @@
|
|||||||
|
CREATE TABLE event_log (
|
||||||
|
id INTEGER PRIMARY KEY,
|
||||||
|
branch_id INTEGER NOT NULL DEFAULT 1,
|
||||||
|
ts TEXT NOT NULL DEFAULT (datetime('now')),
|
||||||
|
kind TEXT NOT NULL,
|
||||||
|
payload_json TEXT NOT NULL,
|
||||||
|
superseded_by INTEGER REFERENCES event_log(id),
|
||||||
|
hidden INTEGER NOT NULL DEFAULT 0
|
||||||
|
);
|
||||||
|
CREATE INDEX idx_event_log_branch_kind ON event_log(branch_id, kind);
|
||||||
@@ -0,0 +1,38 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
import json
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Iterator
|
||||||
|
from sqlite3 import Connection
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Event:
|
||||||
|
id: int
|
||||||
|
branch_id: int
|
||||||
|
ts: str
|
||||||
|
kind: str
|
||||||
|
payload: dict[str, Any]
|
||||||
|
superseded_by: int | None
|
||||||
|
hidden: bool
|
||||||
|
|
||||||
|
|
||||||
|
def append_event(conn: Connection, *, kind: str, payload: dict[str, Any], branch_id: int = 1) -> int:
|
||||||
|
cur = conn.execute(
|
||||||
|
"INSERT INTO event_log (branch_id, kind, payload_json) VALUES (?, ?, ?)",
|
||||||
|
(branch_id, kind, json.dumps(payload)),
|
||||||
|
)
|
||||||
|
return cur.lastrowid
|
||||||
|
|
||||||
|
|
||||||
|
def read_events(conn: Connection, branch_id: int = 1, after_id: int = 0) -> Iterator[Event]:
|
||||||
|
cur = conn.execute(
|
||||||
|
"SELECT id, branch_id, ts, kind, payload_json, superseded_by, hidden "
|
||||||
|
"FROM event_log WHERE branch_id = ? AND id > ? AND hidden = 0 "
|
||||||
|
"AND superseded_by IS NULL ORDER BY id",
|
||||||
|
(branch_id, after_id),
|
||||||
|
)
|
||||||
|
for row in cur:
|
||||||
|
yield Event(
|
||||||
|
id=row[0], branch_id=row[1], ts=row[2], kind=row[3],
|
||||||
|
payload=json.loads(row[4]), superseded_by=row[5], hidden=bool(row[6]),
|
||||||
|
)
|
||||||
@@ -0,0 +1,27 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
from collections.abc import Callable
|
||||||
|
from sqlite3 import Connection
|
||||||
|
from .log import Event, read_events
|
||||||
|
|
||||||
|
Handler = Callable[[Connection, Event], None]
|
||||||
|
_REGISTRY: dict[str, Handler] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def on(kind: str):
|
||||||
|
def deco(fn: Handler) -> Handler:
|
||||||
|
_REGISTRY[kind] = fn
|
||||||
|
return fn
|
||||||
|
return deco
|
||||||
|
|
||||||
|
|
||||||
|
def project(conn: Connection, branch_id: int = 1) -> None:
|
||||||
|
for event in read_events(conn, branch_id=branch_id):
|
||||||
|
h = _REGISTRY.get(event.kind)
|
||||||
|
if h:
|
||||||
|
h(conn, event)
|
||||||
|
|
||||||
|
|
||||||
|
def apply_event(conn: Connection, event: Event) -> None:
|
||||||
|
h = _REGISTRY.get(event.kind)
|
||||||
|
if h:
|
||||||
|
h(conn, event)
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
from chat.db.migrate import apply_migrations
|
||||||
|
from chat.db.connection import open_db
|
||||||
|
from chat.eventlog.log import append_event, read_events
|
||||||
|
|
||||||
|
|
||||||
|
def test_append_and_read(tmp_path):
|
||||||
|
db = tmp_path / "t.db"
|
||||||
|
apply_migrations(db)
|
||||||
|
with open_db(db) as conn:
|
||||||
|
eid = append_event(conn, kind="test_kind", payload={"a": 1})
|
||||||
|
assert eid > 0
|
||||||
|
rows = list(read_events(conn))
|
||||||
|
assert len(rows) == 1
|
||||||
|
assert rows[0].kind == "test_kind"
|
||||||
|
assert rows[0].payload["a"] == 1
|
||||||
Reference in New Issue
Block a user