diff --git a/chat/services/delete_impact.py b/chat/services/delete_impact.py new file mode 100644 index 0000000..28ce422 --- /dev/null +++ b/chat/services/delete_impact.py @@ -0,0 +1,147 @@ +"""Delete-impact computation service (T95, Phase 4). + +Walks event_log forward from a target event_id and produces an ImpactReport +describing what would be removed if rewind-to-target were invoked. Pure +computation — does NOT mutate the database. Used by T98's drawer surgical- +delete UI to render an 'are you sure?' modal before invoking the actual +rewind path (chat/services/rewind.py). +""" + +from __future__ import annotations +import json +from sqlite3 import Connection + +from pydantic import BaseModel, Field + + +class DeletedItem(BaseModel): + kind: str + description: str + target_id: int | str | None = None + + +class ImpactReport(BaseModel): + target_event_id: int + cascading: list[DeletedItem] = Field(default_factory=list) + notes: list[str] = Field(default_factory=list) + + +def _excerpt(text: str, n: int = 60) -> str: + text = (text or "").strip().replace("\n", " ") + return text if len(text) <= n else text[: n - 1] + "…" + + +def compute_delete_impact( + conn: Connection, + *, + target_event_id: int, +) -> ImpactReport: + """Compute the cascading impact of rewinding to target_event_id.""" + # Verify target exists. + target_row = conn.execute( + "SELECT id, kind, payload_json FROM event_log WHERE id = ?", + (target_event_id,), + ).fetchone() + if target_row is None: + return ImpactReport( + target_event_id=target_event_id, + cascading=[], + notes=[f"target event_id {target_event_id} not found"], + ) + + # Walk forward: every event with id >= target_event_id is in scope. + rows = conn.execute( + "SELECT id, kind, payload_json FROM event_log " + "WHERE id >= ? ORDER BY id ASC", + (target_event_id,), + ).fetchall() + + cascading: list[DeletedItem] = [] + notes: list[str] = [] + scene_close_present = False + regenerated_from = None + + for row_id, kind, payload_json in rows: + try: + payload = json.loads(payload_json) if payload_json else {} + except (json.JSONDecodeError, TypeError): + payload = {} + + if kind == "memory_written": + cascading.append( + DeletedItem( + kind=kind, + description=f"memory: {_excerpt(payload.get('pov_summary', ''))}", + target_id=payload.get("memory_id"), + ) + ) + elif kind == "edge_update": + src = payload.get("source_id", "?") + tgt = payload.get("target_id", "?") + cascading.append( + DeletedItem( + kind=kind, + description=f"edge update: {src} -> {tgt}", + target_id=f"{src}->{tgt}", + ) + ) + elif kind == "scene_closed": + scene_close_present = True + cascading.append( + DeletedItem( + kind=kind, + description=f"scene close at {payload.get('closed_at', '?')}", + target_id=payload.get("scene_id"), + ) + ) + elif kind in ("user_turn", "user_turn_edit", "assistant_turn"): + speaker = payload.get("speaker_id") or ("you" if kind.startswith("user") else "?") + prose = payload.get("prose") or payload.get("text") or "" + cascading.append( + DeletedItem( + kind=kind, + description=f"turn {row_id} ({speaker}: {_excerpt(prose, 50)})", + target_id=row_id, + ) + ) + if regenerated_from is None and payload.get("regenerated_from"): + regenerated_from = payload["regenerated_from"] + elif kind == "manual_edit": + target_kind = payload.get("target_kind", "?") + cascading.append( + DeletedItem( + kind=kind, + description=f"manual edit: {target_kind}", + target_id=payload.get("target_id"), + ) + ) + else: + cascading.append( + DeletedItem( + kind=kind, + description=f"{kind} event", + target_id=row_id, + ) + ) + + # Notes / warnings. + notes.append(f"{len(rows)} events would be discarded total") + if scene_close_present: + notes.append( + "scene close events are in scope — closing-scene per-POV summaries " + "and group_node updates will be reverted" + ) + if regenerated_from is not None: + notes.append( + f"target turn was regenerated from event_id {regenerated_from}; " + f"the original turn remains intact" + ) + + return ImpactReport( + target_event_id=target_event_id, + cascading=cascading, + notes=notes, + ) + + +__all__ = ["DeletedItem", "ImpactReport", "compute_delete_impact"] diff --git a/tests/test_delete_impact.py b/tests/test_delete_impact.py new file mode 100644 index 0000000..4c00f07 --- /dev/null +++ b/tests/test_delete_impact.py @@ -0,0 +1,248 @@ +"""Tests for Task 95 — delete-impact computation service (Phase 4). + +`compute_delete_impact` walks event_log forward from a target event_id and +produces an :class:`ImpactReport` describing what would be removed if +rewind-to-target were invoked. It is a pure preview — no database mutation. +T98's drawer surgical-delete UI uses this to render an "are you sure?" +modal before invoking the actual rewind path. +""" + +from __future__ import annotations + +from chat.db.connection import open_db +from chat.db.migrate import apply_migrations +from chat.eventlog.log import append_event +from chat.services.delete_impact import compute_delete_impact + + +def _seed_chat(conn) -> tuple[int, int]: + """Append minimal bot + chat events; return their event ids.""" + bot_id = append_event( + conn, + kind="bot_authored", + payload={ + "id": "bot_a", + "name": "BotA", + "persona": "...", + "voice_samples": [], + "traits": [], + "backstory": "", + "initial_relationship_to_you": "", + "kickoff_prose": "", + }, + ) + chat_id = append_event( + conn, + kind="chat_created", + payload={ + "id": "chat_bot_a", + "host_bot_id": "bot_a", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1", + "weather": "", + }, + ) + return bot_id, chat_id + + +def test_impact_for_simple_turn_lists_memory_and_edges(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_chat(conn) + user_id = append_event( + conn, + kind="user_turn", + payload={ + "chat_id": "chat_bot_a", + "prose": "hey there friend", + "segments": [], + }, + ) + append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_bot_a", + "speaker_id": "bot_a", + "text": "Hi! Good to see you.", + "truncated": False, + "user_turn_id": user_id, + }, + ) + append_event( + conn, + kind="memory_written", + payload={ + "owner_id": "bot_a", + "chat_id": "chat_bot_a", + "pov_summary": "You greeted me warmly today.", + "witness_you": 1, + "witness_host": 1, + "witness_guest": 0, + "source": "turn", + "reliability": 1.0, + "significance": 1, + "pinned": 0, + "auto_pinned": 0, + }, + ) + append_event( + conn, + kind="edge_update", + payload={ + "source_id": "you", + "target_id": "bot_a", + "affinity_delta": 0.1, + }, + ) + + report = compute_delete_impact(conn, target_event_id=user_id) + + assert report.target_event_id == user_id + kinds = [item.kind for item in report.cascading] + # Walk from user_turn forward — user_turn, assistant_turn, + # memory_written, edge_update should all be in scope, in order. + assert kinds == [ + "user_turn", + "assistant_turn", + "memory_written", + "edge_update", + ] + # Memory description includes the pov_summary excerpt. + mem_item = report.cascading[2] + assert "memory:" in mem_item.description + assert "greeted" in mem_item.description + # Edge description includes both endpoints. + edge_item = report.cascading[3] + assert "you" in edge_item.description + assert "bot_a" in edge_item.description + assert edge_item.target_id == "you->bot_a" + # Notes mentions total count. + assert any("4 events" in n for n in report.notes) + + +def test_impact_for_scene_opening_turn_warns_about_subsequent(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_chat(conn) + early_id = append_event( + conn, + kind="user_turn", + payload={"chat_id": "chat_bot_a", "prose": "the start", "segments": []}, + ) + append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_bot_a", + "speaker_id": "bot_a", + "text": "ok", + "truncated": False, + "user_turn_id": early_id, + }, + ) + append_event( + conn, + kind="scene_closed", + payload={ + "scene_id": 1, + "closed_at": "2026-04-26T21:00:00+00:00", + "significance": 2, + }, + ) + + report = compute_delete_impact(conn, target_event_id=early_id) + + # Scene-close warning fires when one is in scope. + assert any("scene close" in n.lower() for n in report.notes) + # The scene_closed event also appears as a cascading item. + assert any(item.kind == "scene_closed" for item in report.cascading) + + +def test_impact_for_missing_event_returns_empty_with_note(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_chat(conn) + report = compute_delete_impact(conn, target_event_id=999_999) + + assert report.cascading == [] + assert any("not found" in n for n in report.notes) + + +def test_impact_does_not_mutate_database(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_chat(conn) + user_id = append_event( + conn, + kind="user_turn", + payload={"chat_id": "chat_bot_a", "prose": "hi", "segments": []}, + ) + append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_bot_a", + "speaker_id": "bot_a", + "text": "hello", + "truncated": False, + "user_turn_id": user_id, + }, + ) + + # Snapshot all event_log rows as a tuple-of-tuples. + before = conn.execute( + "SELECT id, branch_id, ts, kind, payload_json, superseded_by, " + "hidden FROM event_log ORDER BY id" + ).fetchall() + + compute_delete_impact(conn, target_event_id=user_id) + + after = conn.execute( + "SELECT id, branch_id, ts, kind, payload_json, superseded_by, " + "hidden FROM event_log ORDER BY id" + ).fetchall() + + # Byte-identical: nothing inserted, deleted, or updated. + assert before == after + + +def test_impact_includes_regenerated_from_warning(tmp_path): + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_chat(conn) + original_id = append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_bot_a", + "speaker_id": "bot_a", + "text": "first try", + "truncated": False, + "user_turn_id": 0, + }, + ) + regen_id = append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_bot_a", + "speaker_id": "bot_a", + "text": "second try", + "truncated": False, + "user_turn_id": 0, + "regenerated_from": original_id, + }, + ) + + report = compute_delete_impact(conn, target_event_id=regen_id) + + # The regenerated_from note carries the original event id so the user + # knows the original turn isn't lost. + assert any("regenerated from" in n for n in report.notes) + assert any(str(original_id) in n for n in report.notes)