merge: T95 delete-impact computation service

This commit is contained in:
Joseph Doherty
2026-04-27 02:37:28 -04:00
2 changed files with 395 additions and 0 deletions
+147
View File
@@ -0,0 +1,147 @@
"""Delete-impact computation service (T95, Phase 4).
Walks event_log forward from a target event_id and produces an ImpactReport
describing what would be removed if rewind-to-target were invoked. Pure
computation — does NOT mutate the database. Used by T98's drawer surgical-
delete UI to render an 'are you sure?' modal before invoking the actual
rewind path (chat/services/rewind.py).
"""
from __future__ import annotations
import json
from sqlite3 import Connection
from pydantic import BaseModel, Field
class DeletedItem(BaseModel):
kind: str
description: str
target_id: int | str | None = None
class ImpactReport(BaseModel):
target_event_id: int
cascading: list[DeletedItem] = Field(default_factory=list)
notes: list[str] = Field(default_factory=list)
def _excerpt(text: str, n: int = 60) -> str:
text = (text or "").strip().replace("\n", " ")
return text if len(text) <= n else text[: n - 1] + ""
def compute_delete_impact(
conn: Connection,
*,
target_event_id: int,
) -> ImpactReport:
"""Compute the cascading impact of rewinding to target_event_id."""
# Verify target exists.
target_row = conn.execute(
"SELECT id, kind, payload_json FROM event_log WHERE id = ?",
(target_event_id,),
).fetchone()
if target_row is None:
return ImpactReport(
target_event_id=target_event_id,
cascading=[],
notes=[f"target event_id {target_event_id} not found"],
)
# Walk forward: every event with id >= target_event_id is in scope.
rows = conn.execute(
"SELECT id, kind, payload_json FROM event_log "
"WHERE id >= ? ORDER BY id ASC",
(target_event_id,),
).fetchall()
cascading: list[DeletedItem] = []
notes: list[str] = []
scene_close_present = False
regenerated_from = None
for row_id, kind, payload_json in rows:
try:
payload = json.loads(payload_json) if payload_json else {}
except (json.JSONDecodeError, TypeError):
payload = {}
if kind == "memory_written":
cascading.append(
DeletedItem(
kind=kind,
description=f"memory: {_excerpt(payload.get('pov_summary', ''))}",
target_id=payload.get("memory_id"),
)
)
elif kind == "edge_update":
src = payload.get("source_id", "?")
tgt = payload.get("target_id", "?")
cascading.append(
DeletedItem(
kind=kind,
description=f"edge update: {src} -> {tgt}",
target_id=f"{src}->{tgt}",
)
)
elif kind == "scene_closed":
scene_close_present = True
cascading.append(
DeletedItem(
kind=kind,
description=f"scene close at {payload.get('closed_at', '?')}",
target_id=payload.get("scene_id"),
)
)
elif kind in ("user_turn", "user_turn_edit", "assistant_turn"):
speaker = payload.get("speaker_id") or ("you" if kind.startswith("user") else "?")
prose = payload.get("prose") or payload.get("text") or ""
cascading.append(
DeletedItem(
kind=kind,
description=f"turn {row_id} ({speaker}: {_excerpt(prose, 50)})",
target_id=row_id,
)
)
if regenerated_from is None and payload.get("regenerated_from"):
regenerated_from = payload["regenerated_from"]
elif kind == "manual_edit":
target_kind = payload.get("target_kind", "?")
cascading.append(
DeletedItem(
kind=kind,
description=f"manual edit: {target_kind}",
target_id=payload.get("target_id"),
)
)
else:
cascading.append(
DeletedItem(
kind=kind,
description=f"{kind} event",
target_id=row_id,
)
)
# Notes / warnings.
notes.append(f"{len(rows)} events would be discarded total")
if scene_close_present:
notes.append(
"scene close events are in scope — closing-scene per-POV summaries "
"and group_node updates will be reverted"
)
if regenerated_from is not None:
notes.append(
f"target turn was regenerated from event_id {regenerated_from}; "
f"the original turn remains intact"
)
return ImpactReport(
target_event_id=target_event_id,
cascading=cascading,
notes=notes,
)
__all__ = ["DeletedItem", "ImpactReport", "compute_delete_impact"]
+248
View File
@@ -0,0 +1,248 @@
"""Tests for Task 95 — delete-impact computation service (Phase 4).
`compute_delete_impact` walks event_log forward from a target event_id and
produces an :class:`ImpactReport` describing what would be removed if
rewind-to-target were invoked. It is a pure preview — no database mutation.
T98's drawer surgical-delete UI uses this to render an "are you sure?"
modal before invoking the actual rewind path.
"""
from __future__ import annotations
from chat.db.connection import open_db
from chat.db.migrate import apply_migrations
from chat.eventlog.log import append_event
from chat.services.delete_impact import compute_delete_impact
def _seed_chat(conn) -> tuple[int, int]:
"""Append minimal bot + chat events; return their event ids."""
bot_id = append_event(
conn,
kind="bot_authored",
payload={
"id": "bot_a",
"name": "BotA",
"persona": "...",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "",
"kickoff_prose": "",
},
)
chat_id = append_event(
conn,
kind="chat_created",
payload={
"id": "chat_bot_a",
"host_bot_id": "bot_a",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1",
"weather": "",
},
)
return bot_id, chat_id
def test_impact_for_simple_turn_lists_memory_and_edges(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_chat(conn)
user_id = append_event(
conn,
kind="user_turn",
payload={
"chat_id": "chat_bot_a",
"prose": "hey there friend",
"segments": [],
},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "Hi! Good to see you.",
"truncated": False,
"user_turn_id": user_id,
},
)
append_event(
conn,
kind="memory_written",
payload={
"owner_id": "bot_a",
"chat_id": "chat_bot_a",
"pov_summary": "You greeted me warmly today.",
"witness_you": 1,
"witness_host": 1,
"witness_guest": 0,
"source": "turn",
"reliability": 1.0,
"significance": 1,
"pinned": 0,
"auto_pinned": 0,
},
)
append_event(
conn,
kind="edge_update",
payload={
"source_id": "you",
"target_id": "bot_a",
"affinity_delta": 0.1,
},
)
report = compute_delete_impact(conn, target_event_id=user_id)
assert report.target_event_id == user_id
kinds = [item.kind for item in report.cascading]
# Walk from user_turn forward — user_turn, assistant_turn,
# memory_written, edge_update should all be in scope, in order.
assert kinds == [
"user_turn",
"assistant_turn",
"memory_written",
"edge_update",
]
# Memory description includes the pov_summary excerpt.
mem_item = report.cascading[2]
assert "memory:" in mem_item.description
assert "greeted" in mem_item.description
# Edge description includes both endpoints.
edge_item = report.cascading[3]
assert "you" in edge_item.description
assert "bot_a" in edge_item.description
assert edge_item.target_id == "you->bot_a"
# Notes mentions total count.
assert any("4 events" in n for n in report.notes)
def test_impact_for_scene_opening_turn_warns_about_subsequent(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_chat(conn)
early_id = append_event(
conn,
kind="user_turn",
payload={"chat_id": "chat_bot_a", "prose": "the start", "segments": []},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "ok",
"truncated": False,
"user_turn_id": early_id,
},
)
append_event(
conn,
kind="scene_closed",
payload={
"scene_id": 1,
"closed_at": "2026-04-26T21:00:00+00:00",
"significance": 2,
},
)
report = compute_delete_impact(conn, target_event_id=early_id)
# Scene-close warning fires when one is in scope.
assert any("scene close" in n.lower() for n in report.notes)
# The scene_closed event also appears as a cascading item.
assert any(item.kind == "scene_closed" for item in report.cascading)
def test_impact_for_missing_event_returns_empty_with_note(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_chat(conn)
report = compute_delete_impact(conn, target_event_id=999_999)
assert report.cascading == []
assert any("not found" in n for n in report.notes)
def test_impact_does_not_mutate_database(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_chat(conn)
user_id = append_event(
conn,
kind="user_turn",
payload={"chat_id": "chat_bot_a", "prose": "hi", "segments": []},
)
append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "hello",
"truncated": False,
"user_turn_id": user_id,
},
)
# Snapshot all event_log rows as a tuple-of-tuples.
before = conn.execute(
"SELECT id, branch_id, ts, kind, payload_json, superseded_by, "
"hidden FROM event_log ORDER BY id"
).fetchall()
compute_delete_impact(conn, target_event_id=user_id)
after = conn.execute(
"SELECT id, branch_id, ts, kind, payload_json, superseded_by, "
"hidden FROM event_log ORDER BY id"
).fetchall()
# Byte-identical: nothing inserted, deleted, or updated.
assert before == after
def test_impact_includes_regenerated_from_warning(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_chat(conn)
original_id = append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "first try",
"truncated": False,
"user_turn_id": 0,
},
)
regen_id = append_event(
conn,
kind="assistant_turn",
payload={
"chat_id": "chat_bot_a",
"speaker_id": "bot_a",
"text": "second try",
"truncated": False,
"user_turn_id": 0,
"regenerated_from": original_id,
},
)
report = compute_delete_impact(conn, target_event_id=regen_id)
# The regenerated_from note carries the original event id so the user
# knows the original turn isn't lost.
assert any("regenerated from" in n for n in report.notes)
assert any(str(original_id) in n for n in report.notes)