"""Tests for Task 31 — periodic snapshots with retention and cold-load fast-path. Per Requirements §10.4 the periodic snapshot policy is: * Take a snapshot every 100 events OR every 30 minutes since the last one, whichever comes first. * Store under ``data/snapshots/periodic/`` with a UTC timestamp filename. * Retain only the last 5 periodic snapshots; prune older ones on write. * On cold load, restore from the most recent snapshot and replay events past the snapshot's ``last_event_id`` to bring projected state forward. These tests cover the functional core (snapshot timing, pruning, restore). Worker- and lifespan-level wiring is covered by the integration tests in ``test_turn_flow`` and the existing app boot tests. """ from __future__ import annotations import json from chat.db.connection import open_db from chat.db.migrate import apply_migrations from chat.eventlog.log import append_event from chat.eventlog.projector import project from chat.services.snapshot import ( latest_snapshot_path, prune_periodic_snapshots, restore_from_snapshot, should_take_periodic_snapshot, take_snapshot, ) # Importing the state modules registers their projector handlers as a # side effect — restoring + replaying needs them present. import chat.state.entities # noqa: F401 import chat.state.edges # noqa: F401 import chat.state.manual_edit # noqa: F401 import chat.state.memory # noqa: F401 import chat.state.world # noqa: F401 def _bot_payload(bot_id: str, name: str) -> dict: return { "id": bot_id, "name": name, "persona": "fancy", "voice_samples": ["sample"], "traits": ["shy"], "backstory": "", "initial_relationship_to_you": "coworker", "kickoff_prose": "", } def test_take_snapshot_includes_last_event_id(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) project(conn) path = take_snapshot(conn, data_dir=tmp_path / "data", kind="periodic") dump = json.loads(path.read_text()) assert "last_event_id" in dump assert dump["last_event_id"] >= 1 def test_should_take_periodic_when_no_prior_and_events_exist(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) project(conn) assert should_take_periodic_snapshot(conn, tmp_path / "data") is True def test_should_not_take_when_recent_and_few_events(tmp_path): db = tmp_path / "t.db" apply_migrations(db) with open_db(db) as conn: append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) project(conn) # Take a snapshot to establish a recent baseline. take_snapshot(conn, data_dir=tmp_path / "data", kind="periodic") # Right after — should be False (within time threshold and < 100 new events). assert should_take_periodic_snapshot(conn, tmp_path / "data") is False def test_prune_keeps_last_5(tmp_path): snapshot_dir = tmp_path / "data" / "snapshots" / "periodic" snapshot_dir.mkdir(parents=True) # Create 8 dummy snapshot files with sortable names. for i in range(8): p = snapshot_dir / f"2026010{i}T000000Z.json" p.write_text(json.dumps({"last_event_id": i})) removed = prune_periodic_snapshots(tmp_path / "data", keep=5) assert removed == 3 remaining = sorted(snapshot_dir.glob("*.json")) assert len(remaining) == 5 # The 5 most recent (highest names) should remain. assert remaining[0].name == "20260103T000000Z.json" assert remaining[-1].name == "20260107T000000Z.json" def test_latest_snapshot_path_returns_none_when_missing(tmp_path): # No directory yet. assert latest_snapshot_path(tmp_path / "data", kind="periodic") is None # Empty directory. (tmp_path / "data" / "snapshots" / "periodic").mkdir(parents=True) assert latest_snapshot_path(tmp_path / "data", kind="periodic") is None def test_restore_from_snapshot_repopulates_tables(tmp_path): # Source DB: seed a bot, snapshot it. db1 = tmp_path / "t1.db" apply_migrations(db1) with open_db(db1) as conn: append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) project(conn) snapshot_path = take_snapshot( conn, data_dir=tmp_path / "data", kind="periodic" ) # Fresh DB — restore from the snapshot, no event-log replay needed. db2 = tmp_path / "t2.db" apply_migrations(db2) with open_db(db2) as conn: last_id = restore_from_snapshot(conn, snapshot_path) assert last_id >= 1 bot = conn.execute( "SELECT name FROM bots WHERE id = 'bot_a'" ).fetchone() assert bot is not None assert bot[0] == "BotA"