Files
chat/chat/eventlog/log.py
T
Joseph Doherty a902d86432 fix: workers retry-on-lock so they don't drop writes under busy_timeout=100ms
The previous commit dropped open_db's busy_timeout from 5s to 100ms
to prevent the embedding worker from GIL-blocking the asyncio event
loop and silently adding 5s to every state_update LLM call. That fixed
the chat path but broke worker durability: any worker write that
collided with the request handler's brief open transaction failed
with 'database is locked' instead of waiting.

Adds append_and_apply_with_retry in chat/eventlog/log.py — same
contract as append_and_apply but runs through a conn_factory and
retries with exponential backoff (50ms..500ms, ~10s total budget) on
'database is locked'. Returns None and logs WARNING if all retries
fail; callers handle that as a no-op.

Wires it into:
- embedding_worker._process for embedding_indexed events
- background._process for memory_significance_set events (auto-pin
  still uses a direct open_db when the score warrants it; that one
  is fast and not racy in practice)

Verified live: ran 4 back-to-back chat turns, zero worker errors,
embeddings + significance landing correctly. Suite: 464 passed in
11.5s.
2026-04-27 14:04:27 -04:00

129 lines
4.3 KiB
Python

from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass
from typing import Any, Callable, ContextManager, Iterator
from sqlite3 import Connection, OperationalError
_log = logging.getLogger(__name__)
@dataclass
class Event:
id: int
branch_id: int
ts: str
kind: str
payload: dict[str, Any]
superseded_by: int | None
hidden: bool
def append_event(conn: Connection, *, kind: str, payload: dict[str, Any], branch_id: int = 1) -> int:
cur = conn.execute(
"INSERT INTO event_log (branch_id, kind, payload_json) VALUES (?, ?, ?)",
(branch_id, kind, json.dumps(payload)),
)
return cur.lastrowid
def append_and_apply(
conn: Connection,
*,
kind: str,
payload: dict[str, Any],
branch_id: int = 1,
) -> int:
"""Append an event AND immediately apply just that event's handler.
Calling :func:`chat.eventlog.projector.project` after an append
re-runs every prior event, which is fine for idempotent inserts but
catastrophic for delta-shaped events like ``edge_update`` whose
handler is *not* replay-safe (each pass would re-add the same
``affinity_delta``). This helper runs only the brand-new event
through the registered handler, leaving prior state untouched.
No-ops cleanly when ``kind`` has no registered handler — useful for
transcript-only events like ``user_turn`` / ``assistant_turn`` where
callers may swap ``append_event`` for ``append_and_apply`` without
side effects.
"""
# Local import to avoid a circular dependency at module import: the
# projector imports from .log to define ``Event``.
from chat.eventlog.projector import apply_event
eid = append_event(conn, kind=kind, payload=payload, branch_id=branch_id)
event = Event(
id=eid,
branch_id=branch_id,
ts="",
kind=kind,
payload=payload,
superseded_by=None,
hidden=False,
)
apply_event(conn, event)
return eid
async def append_and_apply_with_retry(
conn_factory: Callable[[], ContextManager[Connection]],
*,
kind: str,
payload: dict[str, Any],
branch_id: int = 1,
attempts: int = 30,
base_sleep_s: float = 0.05,
max_sleep_s: float = 0.5,
) -> int | None:
"""Append-and-apply that retries on ``database is locked``.
Background workers (embedding indexer, significance scorer) write
events to the same SQLite file as the request handler. The chat
app sets a tight ``busy_timeout=100ms`` on every connection so a
contending worker can't freeze the request's asyncio event loop.
This helper restores durability for workers: it retries up to
``attempts`` times with exponential backoff (capped at
``max_sleep_s``) until the lock clears.
Returns the appended event's id, or ``None`` if all retries failed
(logged at WARNING). Each retry opens a fresh connection via
``conn_factory`` because the failed write may have left the prior
connection in an unusable state.
"""
sleep = base_sleep_s
for attempt in range(attempts):
try:
with conn_factory() as conn:
return append_and_apply(
conn, kind=kind, payload=payload, branch_id=branch_id
)
except OperationalError as exc:
if "database is locked" not in str(exc).lower():
raise
if attempt == attempts - 1:
_log.warning(
"append_and_apply_with_retry: gave up after %d attempts "
"(kind=%s): %s",
attempts, kind, exc,
)
return None
await asyncio.sleep(sleep)
sleep = min(sleep * 2, max_sleep_s)
def read_events(conn: Connection, branch_id: int = 1, after_id: int = 0) -> Iterator[Event]:
cur = conn.execute(
"SELECT id, branch_id, ts, kind, payload_json, superseded_by, hidden "
"FROM event_log WHERE branch_id = ? AND id > ? AND hidden = 0 "
"AND superseded_by IS NULL ORDER BY id",
(branch_id, after_id),
)
for row in cur:
yield Event(
id=row[0], branch_id=row[1], ts=row[2], kind=row[3],
payload=json.loads(row[4]), superseded_by=row[5], hidden=bool(row[6]),
)