Files
chat/chat/services/background.py
T
2026-04-26 14:18:57 -04:00

263 lines
9.2 KiB
Python

"""Async background worker for post-turn jobs (T22).
The turn flow records a ``memory_written`` event synchronously on the
request path so the timeline updates immediately. Significance scoring is
a separate classifier round-trip that we don't want to block on, so the
turn handler enqueues a :class:`SignificanceJob` here and the worker
drains the queue out-of-band.
A single :class:`BackgroundWorker` is started/stopped via FastAPI lifespan
in :mod:`chat.app`. The worker owns its own ``asyncio.Queue`` and runs
exactly one task that pulls jobs off the queue, calls
:func:`chat.services.significance.compute_significance`, and writes
``memory_significance_set`` (and on score 3, ``memory_pin_changed``)
events. Each job opens its own DB connection — workers and request
handlers don't share connections.
Failures inside ``_process`` are logged and swallowed: a flaky classifier
shouldn't take down the worker. Tests can disable enqueue() by setting
``BackgroundWorker.enabled = False`` (e.g. in the existing turn-flow
fixture, which doesn't have a usable LLM key for the lifespan-managed
factory).
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
from typing import Callable
from chat.config import Settings
from chat.db.connection import open_db
from chat.eventlog.log import append_and_apply
from chat.llm.client import LLMClient
from chat.services.backup import (
prune_backups,
should_take_backup,
take_backup,
)
from chat.services.significance import compute_significance
from chat.services.snapshot import (
prune_periodic_snapshots,
should_take_periodic_snapshot,
take_snapshot,
)
# T32: tick-loop wake interval. 60s gives a single backup window per
# target hour with plenty of slack: should_take_backup's 23h freshness
# guard prevents back-to-back runs.
BACKUP_TICK_INTERVAL_SECONDS = 60.0
log = logging.getLogger(__name__)
@dataclass
class SignificanceJob:
"""One unit of work for the background worker.
``host_bot_id`` is the memory's owner — used both for the auto-pin
soft cap query and as the eventual scope for the soft-cap eviction.
"""
memory_id: int
narrative_text: str
prior_dialogue: list[dict]
host_bot_id: str
class BackgroundWorker:
"""asyncio.Queue-backed single-worker task.
Started on app startup; ``stop()`` enqueues a sentinel and awaits the
task so any in-flight job has a chance to finish. Pending jobs after
the sentinel are dropped on shutdown — Phase 1 simplification.
"""
def __init__(
self,
settings: Settings,
llm_client_factory: Callable[[], LLMClient],
*,
enabled: bool = True,
) -> None:
self._settings = settings
self._llm_client_factory = llm_client_factory
self._queue: asyncio.Queue[SignificanceJob | None] = asyncio.Queue()
self._task: asyncio.Task | None = None
# T32: nightly-backup tick loop runs alongside the job loop. The
# event is set by stop() to wake the loop early so shutdown is
# snappy even mid-tick.
self._tick_task: asyncio.Task | None = None
self._tick_stop: asyncio.Event = asyncio.Event()
self.enabled = enabled
async def start(self) -> None:
if self._task is not None:
return
self._task = asyncio.create_task(self._run())
self._tick_task = asyncio.create_task(self._tick_loop())
async def stop(self) -> None:
# Stop the tick loop first — it has no in-flight work to drain,
# so signalling early lets it exit while the job loop is still
# finishing its sentinel handoff.
self._tick_stop.set()
if self._tick_task is not None:
await self._tick_task
self._tick_task = None
if self._task is None:
return
await self._queue.put(None) # sentinel
await self._task
self._task = None
def enqueue(self, job: SignificanceJob) -> None:
if not self.enabled:
return
self._queue.put_nowait(job)
async def _run(self) -> None:
while True:
job = await self._queue.get()
if job is None:
return
try:
await self._process(job)
except Exception as exc: # noqa: BLE001 — worker must not die
log.exception("significance job failed: %s", exc)
async def _tick_loop(self) -> None:
"""Periodic-operations loop (T32 nightly backup).
Wakes every :data:`BACKUP_TICK_INTERVAL_SECONDS` seconds and
asks :func:`should_take_backup` whether a backup is due. The
scheduling decision lives in the backup module so we don't
duplicate the "is it 03:00?" logic here. Failures are caught
and logged so a flaky disk doesn't kill the loop — the next
tick will retry.
Wait uses :func:`asyncio.wait_for` on ``_tick_stop`` so that
:meth:`stop` can interrupt a sleeping tick instead of having to
wait the full interval.
"""
while not self._tick_stop.is_set():
try:
if should_take_backup(self._settings.data_dir):
take_backup(
db_path=self._settings.db_path,
data_dir=self._settings.data_dir,
)
prune_backups(self._settings.data_dir, keep=14)
log.info("nightly backup taken")
except Exception as exc: # noqa: BLE001 — never break the loop
log.exception("backup tick failed: %s", exc)
try:
await asyncio.wait_for(
self._tick_stop.wait(),
timeout=BACKUP_TICK_INTERVAL_SECONDS,
)
except asyncio.TimeoutError:
# Normal path: timed out waiting for stop, run another tick.
pass
async def _process(self, job: SignificanceJob) -> None:
client = self._llm_client_factory()
score = await compute_significance(
client,
model=self._settings.classifier_model,
narrative_text=job.narrative_text,
prior_dialogue=job.prior_dialogue,
)
with open_db(self._settings.db_path) as conn:
append_and_apply(
conn,
kind="memory_significance_set",
payload={
"memory_id": job.memory_id,
"significance": score,
},
)
if score >= 3:
_auto_pin_with_cap(
conn,
owner_id=job.host_bot_id,
memory_id=job.memory_id,
)
# T31: piggy-back the periodic snapshot check on the background
# worker so we don't need a separate timer task. The classifier
# pass already runs out-of-band, so snapshot I/O on the same
# worker is a natural fit. Each snapshot opens its own
# connection so we don't conflate the snapshot's read-only view
# with the significance-write transaction above. Failures are
# caught and logged: a flaky disk shouldn't take down the
# significance pipeline.
try:
with open_db(self._settings.db_path) as conn:
if should_take_periodic_snapshot(
conn, self._settings.data_dir
):
snapshot_path = take_snapshot(
conn,
data_dir=self._settings.data_dir,
kind="periodic",
)
prune_periodic_snapshots(
self._settings.data_dir, keep=5
)
log.info(
"periodic snapshot taken: %s", snapshot_path
)
except Exception as exc: # noqa: BLE001 — never break the worker
log.exception("periodic snapshot failed: %s", exc)
def _auto_pin_with_cap(
conn,
*,
owner_id: str,
memory_id: int,
cap: int = 8,
) -> None:
"""Auto-pin ``memory_id`` and evict the oldest auto-pin if over ``cap``.
Per §8.5: pivotal turns are auto-pinned, with a soft cap of 8 pins per
bot. When the cap is exceeded the oldest auto-pin is unpinned (manual
pins are never auto-evicted — we filter on ``auto_pinned = 1``).
"""
append_and_apply(
conn,
kind="memory_pin_changed",
payload={
"memory_id": memory_id,
"pinned": 1,
"auto_pinned": 1,
},
)
cur = conn.execute(
"SELECT COUNT(*) FROM memories WHERE owner_id = ? AND pinned = 1",
(owner_id,),
)
count = cur.fetchone()[0]
if count <= cap:
return
cur = conn.execute(
"SELECT id FROM memories "
"WHERE owner_id = ? AND pinned = 1 AND auto_pinned = 1 AND id != ? "
"ORDER BY created_at ASC, id ASC LIMIT 1",
(owner_id, memory_id),
)
row = cur.fetchone()
if row is None:
return
append_and_apply(
conn,
kind="memory_pin_changed",
payload={
"memory_id": row[0],
"pinned": 0,
"auto_pinned": 0,
},
)