feat: delete-impact computation service (preview without mutation) (T95)
This commit is contained in:
@@ -0,0 +1,147 @@
|
||||
"""Delete-impact computation service (T95, Phase 4).
|
||||
|
||||
Walks event_log forward from a target event_id and produces an ImpactReport
|
||||
describing what would be removed if rewind-to-target were invoked. Pure
|
||||
computation — does NOT mutate the database. Used by T98's drawer surgical-
|
||||
delete UI to render an 'are you sure?' modal before invoking the actual
|
||||
rewind path (chat/services/rewind.py).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
import json
|
||||
from sqlite3 import Connection
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class DeletedItem(BaseModel):
|
||||
kind: str
|
||||
description: str
|
||||
target_id: int | str | None = None
|
||||
|
||||
|
||||
class ImpactReport(BaseModel):
|
||||
target_event_id: int
|
||||
cascading: list[DeletedItem] = Field(default_factory=list)
|
||||
notes: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
def _excerpt(text: str, n: int = 60) -> str:
|
||||
text = (text or "").strip().replace("\n", " ")
|
||||
return text if len(text) <= n else text[: n - 1] + "…"
|
||||
|
||||
|
||||
def compute_delete_impact(
|
||||
conn: Connection,
|
||||
*,
|
||||
target_event_id: int,
|
||||
) -> ImpactReport:
|
||||
"""Compute the cascading impact of rewinding to target_event_id."""
|
||||
# Verify target exists.
|
||||
target_row = conn.execute(
|
||||
"SELECT id, kind, payload_json FROM event_log WHERE id = ?",
|
||||
(target_event_id,),
|
||||
).fetchone()
|
||||
if target_row is None:
|
||||
return ImpactReport(
|
||||
target_event_id=target_event_id,
|
||||
cascading=[],
|
||||
notes=[f"target event_id {target_event_id} not found"],
|
||||
)
|
||||
|
||||
# Walk forward: every event with id >= target_event_id is in scope.
|
||||
rows = conn.execute(
|
||||
"SELECT id, kind, payload_json FROM event_log "
|
||||
"WHERE id >= ? ORDER BY id ASC",
|
||||
(target_event_id,),
|
||||
).fetchall()
|
||||
|
||||
cascading: list[DeletedItem] = []
|
||||
notes: list[str] = []
|
||||
scene_close_present = False
|
||||
regenerated_from = None
|
||||
|
||||
for row_id, kind, payload_json in rows:
|
||||
try:
|
||||
payload = json.loads(payload_json) if payload_json else {}
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
payload = {}
|
||||
|
||||
if kind == "memory_written":
|
||||
cascading.append(
|
||||
DeletedItem(
|
||||
kind=kind,
|
||||
description=f"memory: {_excerpt(payload.get('pov_summary', ''))}",
|
||||
target_id=payload.get("memory_id"),
|
||||
)
|
||||
)
|
||||
elif kind == "edge_update":
|
||||
src = payload.get("source_id", "?")
|
||||
tgt = payload.get("target_id", "?")
|
||||
cascading.append(
|
||||
DeletedItem(
|
||||
kind=kind,
|
||||
description=f"edge update: {src} -> {tgt}",
|
||||
target_id=f"{src}->{tgt}",
|
||||
)
|
||||
)
|
||||
elif kind == "scene_closed":
|
||||
scene_close_present = True
|
||||
cascading.append(
|
||||
DeletedItem(
|
||||
kind=kind,
|
||||
description=f"scene close at {payload.get('closed_at', '?')}",
|
||||
target_id=payload.get("scene_id"),
|
||||
)
|
||||
)
|
||||
elif kind in ("user_turn", "user_turn_edit", "assistant_turn"):
|
||||
speaker = payload.get("speaker_id") or ("you" if kind.startswith("user") else "?")
|
||||
prose = payload.get("prose") or payload.get("text") or ""
|
||||
cascading.append(
|
||||
DeletedItem(
|
||||
kind=kind,
|
||||
description=f"turn {row_id} ({speaker}: {_excerpt(prose, 50)})",
|
||||
target_id=row_id,
|
||||
)
|
||||
)
|
||||
if regenerated_from is None and payload.get("regenerated_from"):
|
||||
regenerated_from = payload["regenerated_from"]
|
||||
elif kind == "manual_edit":
|
||||
target_kind = payload.get("target_kind", "?")
|
||||
cascading.append(
|
||||
DeletedItem(
|
||||
kind=kind,
|
||||
description=f"manual edit: {target_kind}",
|
||||
target_id=payload.get("target_id"),
|
||||
)
|
||||
)
|
||||
else:
|
||||
cascading.append(
|
||||
DeletedItem(
|
||||
kind=kind,
|
||||
description=f"{kind} event",
|
||||
target_id=row_id,
|
||||
)
|
||||
)
|
||||
|
||||
# Notes / warnings.
|
||||
notes.append(f"{len(rows)} events would be discarded total")
|
||||
if scene_close_present:
|
||||
notes.append(
|
||||
"scene close events are in scope — closing-scene per-POV summaries "
|
||||
"and group_node updates will be reverted"
|
||||
)
|
||||
if regenerated_from is not None:
|
||||
notes.append(
|
||||
f"target turn was regenerated from event_id {regenerated_from}; "
|
||||
f"the original turn remains intact"
|
||||
)
|
||||
|
||||
return ImpactReport(
|
||||
target_event_id=target_event_id,
|
||||
cascading=cascading,
|
||||
notes=notes,
|
||||
)
|
||||
|
||||
|
||||
__all__ = ["DeletedItem", "ImpactReport", "compute_delete_impact"]
|
||||
@@ -0,0 +1,248 @@
|
||||
"""Tests for Task 95 — delete-impact computation service (Phase 4).
|
||||
|
||||
`compute_delete_impact` walks event_log forward from a target event_id and
|
||||
produces an :class:`ImpactReport` describing what would be removed if
|
||||
rewind-to-target were invoked. It is a pure preview — no database mutation.
|
||||
T98's drawer surgical-delete UI uses this to render an "are you sure?"
|
||||
modal before invoking the actual rewind path.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from chat.db.connection import open_db
|
||||
from chat.db.migrate import apply_migrations
|
||||
from chat.eventlog.log import append_event
|
||||
from chat.services.delete_impact import compute_delete_impact
|
||||
|
||||
|
||||
def _seed_chat(conn) -> tuple[int, int]:
|
||||
"""Append minimal bot + chat events; return their event ids."""
|
||||
bot_id = append_event(
|
||||
conn,
|
||||
kind="bot_authored",
|
||||
payload={
|
||||
"id": "bot_a",
|
||||
"name": "BotA",
|
||||
"persona": "...",
|
||||
"voice_samples": [],
|
||||
"traits": [],
|
||||
"backstory": "",
|
||||
"initial_relationship_to_you": "",
|
||||
"kickoff_prose": "",
|
||||
},
|
||||
)
|
||||
chat_id = append_event(
|
||||
conn,
|
||||
kind="chat_created",
|
||||
payload={
|
||||
"id": "chat_bot_a",
|
||||
"host_bot_id": "bot_a",
|
||||
"initial_time": "2026-04-26T20:00:00+00:00",
|
||||
"narrative_anchor": "Day 1",
|
||||
"weather": "",
|
||||
},
|
||||
)
|
||||
return bot_id, chat_id
|
||||
|
||||
|
||||
def test_impact_for_simple_turn_lists_memory_and_edges(tmp_path):
|
||||
db = tmp_path / "t.db"
|
||||
apply_migrations(db)
|
||||
with open_db(db) as conn:
|
||||
_seed_chat(conn)
|
||||
user_id = append_event(
|
||||
conn,
|
||||
kind="user_turn",
|
||||
payload={
|
||||
"chat_id": "chat_bot_a",
|
||||
"prose": "hey there friend",
|
||||
"segments": [],
|
||||
},
|
||||
)
|
||||
append_event(
|
||||
conn,
|
||||
kind="assistant_turn",
|
||||
payload={
|
||||
"chat_id": "chat_bot_a",
|
||||
"speaker_id": "bot_a",
|
||||
"text": "Hi! Good to see you.",
|
||||
"truncated": False,
|
||||
"user_turn_id": user_id,
|
||||
},
|
||||
)
|
||||
append_event(
|
||||
conn,
|
||||
kind="memory_written",
|
||||
payload={
|
||||
"owner_id": "bot_a",
|
||||
"chat_id": "chat_bot_a",
|
||||
"pov_summary": "You greeted me warmly today.",
|
||||
"witness_you": 1,
|
||||
"witness_host": 1,
|
||||
"witness_guest": 0,
|
||||
"source": "turn",
|
||||
"reliability": 1.0,
|
||||
"significance": 1,
|
||||
"pinned": 0,
|
||||
"auto_pinned": 0,
|
||||
},
|
||||
)
|
||||
append_event(
|
||||
conn,
|
||||
kind="edge_update",
|
||||
payload={
|
||||
"source_id": "you",
|
||||
"target_id": "bot_a",
|
||||
"affinity_delta": 0.1,
|
||||
},
|
||||
)
|
||||
|
||||
report = compute_delete_impact(conn, target_event_id=user_id)
|
||||
|
||||
assert report.target_event_id == user_id
|
||||
kinds = [item.kind for item in report.cascading]
|
||||
# Walk from user_turn forward — user_turn, assistant_turn,
|
||||
# memory_written, edge_update should all be in scope, in order.
|
||||
assert kinds == [
|
||||
"user_turn",
|
||||
"assistant_turn",
|
||||
"memory_written",
|
||||
"edge_update",
|
||||
]
|
||||
# Memory description includes the pov_summary excerpt.
|
||||
mem_item = report.cascading[2]
|
||||
assert "memory:" in mem_item.description
|
||||
assert "greeted" in mem_item.description
|
||||
# Edge description includes both endpoints.
|
||||
edge_item = report.cascading[3]
|
||||
assert "you" in edge_item.description
|
||||
assert "bot_a" in edge_item.description
|
||||
assert edge_item.target_id == "you->bot_a"
|
||||
# Notes mentions total count.
|
||||
assert any("4 events" in n for n in report.notes)
|
||||
|
||||
|
||||
def test_impact_for_scene_opening_turn_warns_about_subsequent(tmp_path):
|
||||
db = tmp_path / "t.db"
|
||||
apply_migrations(db)
|
||||
with open_db(db) as conn:
|
||||
_seed_chat(conn)
|
||||
early_id = append_event(
|
||||
conn,
|
||||
kind="user_turn",
|
||||
payload={"chat_id": "chat_bot_a", "prose": "the start", "segments": []},
|
||||
)
|
||||
append_event(
|
||||
conn,
|
||||
kind="assistant_turn",
|
||||
payload={
|
||||
"chat_id": "chat_bot_a",
|
||||
"speaker_id": "bot_a",
|
||||
"text": "ok",
|
||||
"truncated": False,
|
||||
"user_turn_id": early_id,
|
||||
},
|
||||
)
|
||||
append_event(
|
||||
conn,
|
||||
kind="scene_closed",
|
||||
payload={
|
||||
"scene_id": 1,
|
||||
"closed_at": "2026-04-26T21:00:00+00:00",
|
||||
"significance": 2,
|
||||
},
|
||||
)
|
||||
|
||||
report = compute_delete_impact(conn, target_event_id=early_id)
|
||||
|
||||
# Scene-close warning fires when one is in scope.
|
||||
assert any("scene close" in n.lower() for n in report.notes)
|
||||
# The scene_closed event also appears as a cascading item.
|
||||
assert any(item.kind == "scene_closed" for item in report.cascading)
|
||||
|
||||
|
||||
def test_impact_for_missing_event_returns_empty_with_note(tmp_path):
|
||||
db = tmp_path / "t.db"
|
||||
apply_migrations(db)
|
||||
with open_db(db) as conn:
|
||||
_seed_chat(conn)
|
||||
report = compute_delete_impact(conn, target_event_id=999_999)
|
||||
|
||||
assert report.cascading == []
|
||||
assert any("not found" in n for n in report.notes)
|
||||
|
||||
|
||||
def test_impact_does_not_mutate_database(tmp_path):
|
||||
db = tmp_path / "t.db"
|
||||
apply_migrations(db)
|
||||
with open_db(db) as conn:
|
||||
_seed_chat(conn)
|
||||
user_id = append_event(
|
||||
conn,
|
||||
kind="user_turn",
|
||||
payload={"chat_id": "chat_bot_a", "prose": "hi", "segments": []},
|
||||
)
|
||||
append_event(
|
||||
conn,
|
||||
kind="assistant_turn",
|
||||
payload={
|
||||
"chat_id": "chat_bot_a",
|
||||
"speaker_id": "bot_a",
|
||||
"text": "hello",
|
||||
"truncated": False,
|
||||
"user_turn_id": user_id,
|
||||
},
|
||||
)
|
||||
|
||||
# Snapshot all event_log rows as a tuple-of-tuples.
|
||||
before = conn.execute(
|
||||
"SELECT id, branch_id, ts, kind, payload_json, superseded_by, "
|
||||
"hidden FROM event_log ORDER BY id"
|
||||
).fetchall()
|
||||
|
||||
compute_delete_impact(conn, target_event_id=user_id)
|
||||
|
||||
after = conn.execute(
|
||||
"SELECT id, branch_id, ts, kind, payload_json, superseded_by, "
|
||||
"hidden FROM event_log ORDER BY id"
|
||||
).fetchall()
|
||||
|
||||
# Byte-identical: nothing inserted, deleted, or updated.
|
||||
assert before == after
|
||||
|
||||
|
||||
def test_impact_includes_regenerated_from_warning(tmp_path):
|
||||
db = tmp_path / "t.db"
|
||||
apply_migrations(db)
|
||||
with open_db(db) as conn:
|
||||
_seed_chat(conn)
|
||||
original_id = append_event(
|
||||
conn,
|
||||
kind="assistant_turn",
|
||||
payload={
|
||||
"chat_id": "chat_bot_a",
|
||||
"speaker_id": "bot_a",
|
||||
"text": "first try",
|
||||
"truncated": False,
|
||||
"user_turn_id": 0,
|
||||
},
|
||||
)
|
||||
regen_id = append_event(
|
||||
conn,
|
||||
kind="assistant_turn",
|
||||
payload={
|
||||
"chat_id": "chat_bot_a",
|
||||
"speaker_id": "bot_a",
|
||||
"text": "second try",
|
||||
"truncated": False,
|
||||
"user_turn_id": 0,
|
||||
"regenerated_from": original_id,
|
||||
},
|
||||
)
|
||||
|
||||
report = compute_delete_impact(conn, target_event_id=regen_id)
|
||||
|
||||
# The regenerated_from note carries the original event id so the user
|
||||
# knows the original turn isn't lost.
|
||||
assert any("regenerated from" in n for n in report.notes)
|
||||
assert any(str(original_id) in n for n in report.notes)
|
||||
Reference in New Issue
Block a user