diff --git a/chat/eventlog/log.py b/chat/eventlog/log.py index ad228da..66c5161 100644 --- a/chat/eventlog/log.py +++ b/chat/eventlog/log.py @@ -1,8 +1,13 @@ from __future__ import annotations +import asyncio import json +import logging from dataclasses import dataclass -from typing import Any, Iterator -from sqlite3 import Connection +from typing import Any, Callable, ContextManager, Iterator +from sqlite3 import Connection, OperationalError + + +_log = logging.getLogger(__name__) @dataclass @@ -63,6 +68,52 @@ def append_and_apply( return eid +async def append_and_apply_with_retry( + conn_factory: Callable[[], ContextManager[Connection]], + *, + kind: str, + payload: dict[str, Any], + branch_id: int = 1, + attempts: int = 30, + base_sleep_s: float = 0.05, + max_sleep_s: float = 0.5, +) -> int | None: + """Append-and-apply that retries on ``database is locked``. + + Background workers (embedding indexer, significance scorer) write + events to the same SQLite file as the request handler. The chat + app sets a tight ``busy_timeout=100ms`` on every connection so a + contending worker can't freeze the request's asyncio event loop. + This helper restores durability for workers: it retries up to + ``attempts`` times with exponential backoff (capped at + ``max_sleep_s``) until the lock clears. + + Returns the appended event's id, or ``None`` if all retries failed + (logged at WARNING). Each retry opens a fresh connection via + ``conn_factory`` because the failed write may have left the prior + connection in an unusable state. + """ + sleep = base_sleep_s + for attempt in range(attempts): + try: + with conn_factory() as conn: + return append_and_apply( + conn, kind=kind, payload=payload, branch_id=branch_id + ) + except OperationalError as exc: + if "database is locked" not in str(exc).lower(): + raise + if attempt == attempts - 1: + _log.warning( + "append_and_apply_with_retry: gave up after %d attempts " + "(kind=%s): %s", + attempts, kind, exc, + ) + return None + await asyncio.sleep(sleep) + sleep = min(sleep * 2, max_sleep_s) + + def read_events(conn: Connection, branch_id: int = 1, after_id: int = 0) -> Iterator[Event]: cur = conn.execute( "SELECT id, branch_id, ts, kind, payload_json, superseded_by, hidden " diff --git a/chat/services/background.py b/chat/services/background.py index 27eecaa..4a08f79 100644 --- a/chat/services/background.py +++ b/chat/services/background.py @@ -30,7 +30,7 @@ from typing import Callable from chat.config import Settings from chat.db.connection import open_db -from chat.eventlog.log import append_and_apply +from chat.eventlog.log import append_and_apply, append_and_apply_with_retry from chat.llm.client import LLMClient from chat.services.backup import ( prune_backups, @@ -169,16 +169,22 @@ class BackgroundWorker: narrative_text=job.narrative_text, prior_dialogue=job.prior_dialogue, ) - with open_db(self._settings.db_path) as conn: - append_and_apply( - conn, - kind="memory_significance_set", - payload={ - "memory_id": job.memory_id, - "significance": score, - }, - ) - if score >= 3: + # Retry-on-lock: see chat/eventlog/log.py's + # ``append_and_apply_with_retry`` docstring for why workers + # need to retry while the request handler's open transaction + # holds the WAL write lock briefly. + appended_id = await append_and_apply_with_retry( + lambda: open_db(self._settings.db_path), + kind="memory_significance_set", + payload={ + "memory_id": job.memory_id, + "significance": score, + }, + ) + # Auto-pin requires a separate connection because retry-helper + # closed its own. Skip if the significance event itself failed. + if appended_id is not None and score >= 3: + with open_db(self._settings.db_path) as conn: _auto_pin_with_cap( conn, owner_id=job.host_bot_id, diff --git a/chat/services/embedding_worker.py b/chat/services/embedding_worker.py index 80f87d8..5051ba4 100644 --- a/chat/services/embedding_worker.py +++ b/chat/services/embedding_worker.py @@ -26,7 +26,7 @@ from dataclasses import dataclass from sqlite3 import Connection from typing import Callable -from chat.eventlog.log import append_and_apply +from chat.eventlog.log import append_and_apply_with_retry from chat.services.embeddings import ( DEFAULT_EMBEDDING_DIM, DEFAULT_EMBEDDING_MODEL, @@ -121,17 +121,22 @@ class EmbeddingWorker: job.memory_id, ) return - with self._conn_factory() as conn: - append_and_apply( - conn, - kind="embedding_indexed", - payload={ - "memory_id": job.memory_id, - "model": result.model, - "dim": result.dim, - "vector": result.vector, - }, - ) + # Retry-on-lock: the request handler holds an open transaction + # for the duration of post_turn (a few seconds), so any worker + # write started during that window blocks. open_db's + # busy_timeout is 100ms (so the request path itself can't get + # stuck on a worker), so retry here with backoff. Each retry + # opens a fresh connection via ``conn_factory``. + await append_and_apply_with_retry( + self._conn_factory, + kind="embedding_indexed", + payload={ + "memory_id": job.memory_id, + "model": result.model, + "dim": result.dim, + "vector": result.vector, + }, + ) __all__ = ["EmbeddingJob", "EmbeddingWorker"]