merge: T114 regenerate lifecycle rollback (back-ref + event_status_reverted)
This commit is contained in:
+130
-36
@@ -95,6 +95,27 @@ from chat.web.render import render_turn_html
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# T114.3: map a lifecycle-transition event kind to the events-table
|
||||
# status it implicitly transitioned *from*. Regenerate uses this to pick
|
||||
# the ``prior_status`` value for the ``event_status_reverted`` rollback
|
||||
# event so the projector sets the row back to where it was before the
|
||||
# superseded turn fired the transition.
|
||||
#
|
||||
# - ``event_started`` was emitted when the row was 'planned' → revert to
|
||||
# 'planned'.
|
||||
# - ``event_completed`` was emitted when the row was 'active' → revert
|
||||
# to 'active'.
|
||||
# - ``event_cancelled`` could have fired from either 'planned' or
|
||||
# 'active'. Best-effort default: 'active'. The forward transitions
|
||||
# below only fire detect_event_transitions for currently-active rows,
|
||||
# so 'active' is the realistic prior in practice.
|
||||
_PRIOR_STATUS_MAP: dict[str, str] = {
|
||||
"event_started": "planned",
|
||||
"event_completed": "active",
|
||||
"event_cancelled": "active",
|
||||
}
|
||||
|
||||
|
||||
async def regenerate_assistant_turn(
|
||||
conn: Connection,
|
||||
client,
|
||||
@@ -115,17 +136,18 @@ async def regenerate_assistant_turn(
|
||||
cannot be found — the FastAPI route translates this to 404.
|
||||
|
||||
.. note::
|
||||
**Lifecycle-rollback limitation (T83.4, Phase 4 follow-up).**
|
||||
**Lifecycle rollback (T114, Phase 4.5).**
|
||||
When the superseded turn already produced lifecycle transitions
|
||||
(``event_started`` / ``event_completed`` / ``event_cancelled``),
|
||||
this function does NOT roll those rows back before re-running
|
||||
``detect_event_transitions`` against the regenerated text. A
|
||||
regenerate-after-completion can therefore double-emit promotion
|
||||
artifacts if the new text re-completes the same event. Phase 3.5
|
||||
only documents the gap and emits a WARNING log naming the
|
||||
affected event_log ids; the actual undo pass is invasive
|
||||
(re-projection / inverse-handler dispatch) and is deferred to
|
||||
Phase 4. See the ``# T83.4`` block below for the warning emit.
|
||||
this function emits an ``event_status_reverted`` event for each
|
||||
so the events row's status returns to its prior value before the
|
||||
regenerated narrative is reclassified. Backward compatibility:
|
||||
lifecycle events authored before T114.1 lack the
|
||||
``triggered_by_assistant_turn_id`` payload field; rollback skips
|
||||
those (logged at DEBUG) so historic rows are not retroactively
|
||||
reverted. A WARNING about un-rolled-back transitions is still
|
||||
emitted when stragglers are found — the rollback handles the
|
||||
common case while older logs continue to need manual review.
|
||||
"""
|
||||
chat = get_chat(conn, chat_id)
|
||||
if chat is None:
|
||||
@@ -158,20 +180,21 @@ async def regenerate_assistant_turn(
|
||||
original_assistant_payload = json.loads(row[0])
|
||||
original_user_turn_id = original_assistant_payload.get("user_turn_id")
|
||||
|
||||
# T83.4: scan for downstream lifecycle transitions emitted by the
|
||||
# superseded turn — they're not being rolled back (see method
|
||||
# docstring). Heuristic: any ``event_started`` / ``event_completed``
|
||||
# / ``event_cancelled`` event_log row with id strictly greater than
|
||||
# the original assistant_turn's id was emitted as part of (or after)
|
||||
# that turn's processing. Lifecycle events don't carry ``chat_id``
|
||||
# in their payload (their payload references an ``event_id`` FK to
|
||||
# the ``events`` table, which holds chat_id), so we join through
|
||||
# ``events`` to scope to this chat.
|
||||
#
|
||||
# A WARNING log surfaces the affected event ids so operators can
|
||||
# spot double-emit cases until the Phase 4 rollback pass lands.
|
||||
# T114.3: roll back lifecycle transitions emitted by the superseded
|
||||
# turn. The scan uses the same id-greater-than-superseded-turn
|
||||
# heuristic as the legacy T83.4 warning, joined to ``events`` for
|
||||
# chat scoping (lifecycle events don't carry chat_id in their
|
||||
# payload — they reference an ``event_id`` FK to the ``events``
|
||||
# table, which holds chat_id). For each row whose payload carries
|
||||
# ``triggered_by_assistant_turn_id == original_assistant_event_id``
|
||||
# (T114.1 back-reference), emit an ``event_status_reverted`` event
|
||||
# so the events-row status returns to the pre-transition value.
|
||||
# Lifecycle rows authored before T114.1 lack the back-reference;
|
||||
# those are skipped (DEBUG log) and a WARNING tracks their count so
|
||||
# operators still see legacy stragglers — preserves the T83.4
|
||||
# observability contract for un-rolled-back transitions.
|
||||
unrolled_lifecycle = conn.execute(
|
||||
"SELECT el.id, el.kind FROM event_log AS el "
|
||||
"SELECT el.id, el.kind, el.payload_json FROM event_log AS el "
|
||||
"JOIN events AS ev "
|
||||
" ON ev.event_id = json_extract(el.payload_json, '$.event_id') "
|
||||
"WHERE el.kind IN ("
|
||||
@@ -182,18 +205,73 @@ async def regenerate_assistant_turn(
|
||||
"ORDER BY el.id ASC",
|
||||
(chat_id, original_assistant_event_id),
|
||||
).fetchall()
|
||||
if unrolled_lifecycle:
|
||||
# T90.2: phrased as "at-or-after turn <id>" rather than "from
|
||||
# superseded turn" because regenerating an OLDER turn lists
|
||||
# intervening-turn transitions that legitimately stand on their
|
||||
# own — those weren't authored by the superseded turn itself.
|
||||
rolled_back_ids: list[int] = []
|
||||
skipped_no_backref: list[int] = []
|
||||
for el_id, el_kind, el_payload_json in unrolled_lifecycle:
|
||||
try:
|
||||
lifecycle_payload = json.loads(el_payload_json)
|
||||
except (TypeError, ValueError):
|
||||
skipped_no_backref.append(el_id)
|
||||
continue
|
||||
triggered_by = lifecycle_payload.get("triggered_by_assistant_turn_id")
|
||||
if triggered_by != original_assistant_event_id:
|
||||
# Either a legacy row (no field) or a transition triggered
|
||||
# by a *different* turn — leave it alone. DEBUG so the
|
||||
# message is available under verbose logging without
|
||||
# spamming the default WARNING channel.
|
||||
_log.debug(
|
||||
"regenerate_assistant_turn: skipping rollback for "
|
||||
"lifecycle event_log id=%d (kind=%s) — no back-reference "
|
||||
"or different turn (triggered_by=%r vs superseded=%d)",
|
||||
el_id,
|
||||
el_kind,
|
||||
triggered_by,
|
||||
original_assistant_event_id,
|
||||
)
|
||||
if triggered_by is None:
|
||||
skipped_no_backref.append(el_id)
|
||||
continue
|
||||
prior_status = _PRIOR_STATUS_MAP.get(el_kind)
|
||||
if prior_status is None:
|
||||
# Defensive: the SQL filter already restricts to the three
|
||||
# known kinds, but a future schema addition shouldn't crash
|
||||
# the rollback path.
|
||||
continue
|
||||
target_event_id = lifecycle_payload.get("event_id")
|
||||
if target_event_id is None:
|
||||
continue
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="event_status_reverted",
|
||||
payload={
|
||||
"event_id": target_event_id,
|
||||
"prior_status": prior_status,
|
||||
},
|
||||
)
|
||||
rolled_back_ids.append(el_id)
|
||||
if rolled_back_ids:
|
||||
_log.info(
|
||||
"regenerate_assistant_turn: rolled back %d lifecycle "
|
||||
"transition(s) triggered by superseded turn %s "
|
||||
"(event_log ids: %s)",
|
||||
len(rolled_back_ids),
|
||||
original_assistant_event_id,
|
||||
rolled_back_ids,
|
||||
)
|
||||
if skipped_no_backref:
|
||||
# T83.4 (legacy) compatibility: still warn about stragglers
|
||||
# without the back-reference so operators can spot pre-T114
|
||||
# double-emit risks. Phrased as "at-or-after turn <id>" per
|
||||
# T90.2 — older transitions may legitimately belong to other
|
||||
# turns.
|
||||
_log.warning(
|
||||
"regenerate_assistant_turn: %d lifecycle transition(s) "
|
||||
"at-or-after turn %s are NOT being rolled back (Phase 4 "
|
||||
"follow-up). Affected event ids: %s",
|
||||
len(unrolled_lifecycle),
|
||||
"at-or-after turn %s are NOT being rolled back (no "
|
||||
"triggered_by_assistant_turn_id back-reference). "
|
||||
"Affected event ids: %s",
|
||||
len(skipped_no_backref),
|
||||
original_assistant_event_id,
|
||||
[r[0] for r in unrolled_lifecycle],
|
||||
skipped_no_backref,
|
||||
)
|
||||
|
||||
# 1a. Look up any sibling interjection beat in the same turn group
|
||||
@@ -716,11 +794,13 @@ async def regenerate_assistant_turn(
|
||||
# runs inline after a completion so promotion artifacts land in the
|
||||
# same regenerate path.
|
||||
#
|
||||
# T83.4 follow-up: when a regenerate replaces a turn that had
|
||||
# already produced event transitions, those original transitions
|
||||
# are NOT undone here (Phase 4 work). A WARNING log earlier in this
|
||||
# function names the affected event_log ids — see the T83.4 block
|
||||
# near the function entry.
|
||||
# T114.3: original-turn transitions emitted before this regenerate
|
||||
# ran were rolled back at the top of the function (see the
|
||||
# ``# T114.3`` block) by appending ``event_status_reverted`` for
|
||||
# each. The classify-and-emit pass below now operates against an
|
||||
# ``events`` projection that has already been reverted, so it can
|
||||
# safely re-fire transitions for the regenerated narrative without
|
||||
# double-emitting promotion artifacts.
|
||||
new_active_events = list_active_events(conn, chat_id)
|
||||
if new_active_events:
|
||||
lifecycle_decision = await detect_event_transitions(
|
||||
@@ -738,6 +818,12 @@ async def regenerate_assistant_turn(
|
||||
payload={
|
||||
"event_id": transition.event_id,
|
||||
"started_at": chat.get("time"),
|
||||
# T114.1: back-reference to the assistant_turn
|
||||
# that triggered this transition (see turns.py
|
||||
# for rationale).
|
||||
"triggered_by_assistant_turn_id": (
|
||||
new_assistant_event_id
|
||||
),
|
||||
},
|
||||
)
|
||||
elif transition.new_status == "completed":
|
||||
@@ -747,6 +833,10 @@ async def regenerate_assistant_turn(
|
||||
payload={
|
||||
"event_id": transition.event_id,
|
||||
"completed_at": chat.get("time"),
|
||||
# T114.1: back-reference (see above).
|
||||
"triggered_by_assistant_turn_id": (
|
||||
new_assistant_event_id
|
||||
),
|
||||
},
|
||||
)
|
||||
promote_completed_event(
|
||||
@@ -762,6 +852,10 @@ async def regenerate_assistant_turn(
|
||||
payload={
|
||||
"event_id": transition.event_id,
|
||||
"completed_at": chat.get("time"),
|
||||
# T114.1: back-reference (see above).
|
||||
"triggered_by_assistant_turn_id": (
|
||||
new_assistant_event_id
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -67,6 +67,29 @@ def _apply_event_expired(conn: Connection, e: Event) -> None:
|
||||
)
|
||||
|
||||
|
||||
@on("event_status_reverted")
|
||||
def _apply_event_status_reverted(conn: Connection, e: Event) -> None:
|
||||
"""T114.2: Revert an event row's status to ``prior_status``.
|
||||
|
||||
Emitted by ``regenerate_assistant_turn`` when a superseded turn had
|
||||
triggered a lifecycle transition (event_started / event_completed /
|
||||
event_cancelled). The rollback step needs an inverse projection that
|
||||
sets the row's status back to whatever it was *before* the now-
|
||||
superseded transition fired.
|
||||
|
||||
Unlike the forward transitions (which guard against terminal-status
|
||||
overwrites) this handler is unconditional — the entire purpose is to
|
||||
reverse a transition, including reverting from a terminal status
|
||||
(completed/cancelled) back to a non-terminal one.
|
||||
"""
|
||||
p = e.payload
|
||||
conn.execute(
|
||||
"UPDATE events SET status = ?, updated_at = datetime('now') "
|
||||
"WHERE event_id = ?",
|
||||
(p["prior_status"], p["event_id"]),
|
||||
)
|
||||
|
||||
|
||||
def get_event(conn: Connection, event_id: str) -> dict | None:
|
||||
row = conn.execute(
|
||||
"SELECT event_id, chat_id, kind, status, props_json, planned_for, "
|
||||
|
||||
@@ -812,6 +812,14 @@ async def post_turn(
|
||||
payload={
|
||||
"event_id": transition.event_id,
|
||||
"started_at": chat.get("time"),
|
||||
# T114.1: back-reference to the assistant_turn that
|
||||
# triggered this transition. Regenerate uses this
|
||||
# to roll back lifecycle transitions when the turn
|
||||
# is superseded. Forward-only — older events
|
||||
# without this field are skipped by rollback.
|
||||
"triggered_by_assistant_turn_id": (
|
||||
primary_assistant_event_id
|
||||
),
|
||||
},
|
||||
)
|
||||
elif transition.new_status == "completed":
|
||||
@@ -821,6 +829,10 @@ async def post_turn(
|
||||
payload={
|
||||
"event_id": transition.event_id,
|
||||
"completed_at": chat.get("time"),
|
||||
# T114.1: back-reference (see above).
|
||||
"triggered_by_assistant_turn_id": (
|
||||
primary_assistant_event_id
|
||||
),
|
||||
},
|
||||
)
|
||||
# Run promotion inline so the artifact-emitting events
|
||||
@@ -842,6 +854,10 @@ async def post_turn(
|
||||
payload={
|
||||
"event_id": transition.event_id,
|
||||
"completed_at": chat.get("time"),
|
||||
# T114.1: back-reference (see above).
|
||||
"triggered_by_assistant_turn_id": (
|
||||
primary_assistant_event_id
|
||||
),
|
||||
},
|
||||
)
|
||||
# Any other ``new_status`` value falls through silently —
|
||||
|
||||
@@ -233,3 +233,91 @@ def test_list_active_events_filters_to_planned_and_active(tmp_path):
|
||||
|
||||
cancelled = list_events_in_status(conn, "chat_bot_a", "cancelled")
|
||||
assert [e["event_id"] for e in cancelled] == ["evt_canx"]
|
||||
|
||||
|
||||
def test_event_status_reverted_returns_to_prior_status(tmp_path):
|
||||
"""T114.2: ``event_status_reverted`` rolls a row back to ``prior_status``.
|
||||
|
||||
Unlike the forward transitions, this projector handler is
|
||||
unconditional — its sole purpose is to undo a transition, including
|
||||
reverting from a terminal status (completed/cancelled) back to a
|
||||
non-terminal one.
|
||||
|
||||
Three round-trips covered:
|
||||
- completed → active (rollback of an event_completed)
|
||||
- active → planned (rollback of an event_started)
|
||||
- cancelled → active (rollback of an event_cancelled)
|
||||
"""
|
||||
db = tmp_path / "t.db"
|
||||
apply_migrations(db)
|
||||
with open_db(db) as conn:
|
||||
_seed_chat(conn)
|
||||
append_event(
|
||||
conn,
|
||||
kind="event_planned",
|
||||
payload={
|
||||
"event_id": "evt_revert",
|
||||
"chat_id": "chat_bot_a",
|
||||
"kind": "date_at_park",
|
||||
"props": {},
|
||||
"planned_for": "2026-04-30T18:00:00+00:00",
|
||||
},
|
||||
)
|
||||
append_event(
|
||||
conn,
|
||||
kind="event_started",
|
||||
payload={
|
||||
"event_id": "evt_revert",
|
||||
"started_at": "2026-04-30T18:01:00+00:00",
|
||||
},
|
||||
)
|
||||
append_event(
|
||||
conn,
|
||||
kind="event_completed",
|
||||
payload={
|
||||
"event_id": "evt_revert",
|
||||
"completed_at": "2026-04-30T20:00:00+00:00",
|
||||
},
|
||||
)
|
||||
project(conn)
|
||||
|
||||
ev = get_event(conn, "evt_revert")
|
||||
assert ev is not None
|
||||
assert ev["status"] == "completed"
|
||||
|
||||
# Revert from completed → active.
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="event_status_reverted",
|
||||
payload={"event_id": "evt_revert", "prior_status": "active"},
|
||||
)
|
||||
ev = get_event(conn, "evt_revert")
|
||||
assert ev["status"] == "active"
|
||||
|
||||
# Revert from active → planned.
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="event_status_reverted",
|
||||
payload={"event_id": "evt_revert", "prior_status": "planned"},
|
||||
)
|
||||
ev = get_event(conn, "evt_revert")
|
||||
assert ev["status"] == "planned"
|
||||
|
||||
# Forward to cancelled, then revert from cancelled → active.
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="event_cancelled",
|
||||
payload={
|
||||
"event_id": "evt_revert",
|
||||
"completed_at": "2026-04-30T20:30:00+00:00",
|
||||
},
|
||||
)
|
||||
ev = get_event(conn, "evt_revert")
|
||||
assert ev["status"] == "cancelled"
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="event_status_reverted",
|
||||
payload={"event_id": "evt_revert", "prior_status": "active"},
|
||||
)
|
||||
ev = get_event(conn, "evt_revert")
|
||||
assert ev["status"] == "active"
|
||||
|
||||
@@ -1022,3 +1022,346 @@ def test_regenerate_registers_task_in_in_flight_tasks(tmp_path, monkeypatch):
|
||||
assert isinstance(in_flight_snapshot.get("task"), asyncio.Task)
|
||||
# Post-flight: the entry has been cleaned up.
|
||||
assert "chat_bot_a" not in _in_flight_tasks
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# T114: lifecycle rollback. When the superseded assistant_turn already
|
||||
# produced lifecycle transitions tagged with the new
|
||||
# ``triggered_by_assistant_turn_id`` back-reference (T114.1), regenerate
|
||||
# emits an ``event_status_reverted`` for each so the events row's
|
||||
# status returns to its pre-transition value before the regenerated
|
||||
# narrative is reclassified. Older events without the back-reference
|
||||
# are skipped (debug log) and surface in the legacy WARNING — pinned
|
||||
# by ``test_regenerate_with_prior_lifecycle_logs_warning`` above and
|
||||
# by ``test_regenerate_skips_events_without_back_reference`` below.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _seed_event_with_lifecycle(
|
||||
db_path,
|
||||
*,
|
||||
event_id: str,
|
||||
triggered_by_assistant_turn_id: int,
|
||||
forward_kinds: list[str],
|
||||
):
|
||||
"""Helper: seed an events row and replay lifecycle transitions tagged
|
||||
with ``triggered_by_assistant_turn_id`` so T114 rollback fires.
|
||||
|
||||
``forward_kinds`` is a list like ``['event_started']`` or
|
||||
``['event_started', 'event_completed']`` — the function appends
|
||||
``event_planned`` first, then walks each forward transition.
|
||||
"""
|
||||
from chat.eventlog.log import append_and_apply
|
||||
|
||||
with open_db(db_path) as conn:
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="event_planned",
|
||||
payload={
|
||||
"event_id": event_id,
|
||||
"chat_id": "chat_bot_a",
|
||||
"kind": "story_event",
|
||||
"props": {},
|
||||
"planned_for": "2026-04-30T18:00:00+00:00",
|
||||
},
|
||||
)
|
||||
for kind in forward_kinds:
|
||||
payload: dict = {
|
||||
"event_id": event_id,
|
||||
"triggered_by_assistant_turn_id": (
|
||||
triggered_by_assistant_turn_id
|
||||
),
|
||||
}
|
||||
if kind == "event_started":
|
||||
payload["started_at"] = "2026-04-30T19:00:00+00:00"
|
||||
else:
|
||||
payload["completed_at"] = "2026-04-30T19:30:00+00:00"
|
||||
append_and_apply(conn, kind=kind, payload=payload)
|
||||
|
||||
|
||||
def test_regenerate_rolls_back_event_started_from_superseded_turn(
|
||||
tmp_path, monkeypatch
|
||||
):
|
||||
"""T114.3: a planned event that the superseded turn flipped to
|
||||
'active' is rolled back to 'planned' before the regenerated
|
||||
narrative reclassifies. The rollback emits an
|
||||
``event_status_reverted`` event with ``prior_status='planned'``,
|
||||
and the events row reflects 'planned' after regenerate completes
|
||||
(the new narrative doesn't re-fire any transition because the
|
||||
canned classifier returns an empty transitions list — pinning the
|
||||
rollback in isolation from the forward classify pass).
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
from chat.config import Settings
|
||||
from chat.db.migrate import apply_migrations
|
||||
from chat.services.regenerate import regenerate_assistant_turn
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
cfg = tmp_path / "config.toml"
|
||||
cfg.write_text('featherless_api_key = "test"\n')
|
||||
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
|
||||
monkeypatch.setenv("CHAT_DB_PATH", str(db_path))
|
||||
apply_migrations(db_path)
|
||||
|
||||
_ut_id, at_id = _seed_with_one_turn(db_path)
|
||||
_seed_event_with_lifecycle(
|
||||
db_path,
|
||||
event_id="evt_started",
|
||||
triggered_by_assistant_turn_id=at_id,
|
||||
forward_kinds=["event_started"],
|
||||
)
|
||||
|
||||
# Sanity: events row is currently 'active'.
|
||||
with open_db(db_path) as conn:
|
||||
status = conn.execute(
|
||||
"SELECT status FROM events WHERE event_id = ?", ("evt_started",)
|
||||
).fetchone()[0]
|
||||
assert status == "active"
|
||||
|
||||
# Canned: narrative + 2 state-updates + lifecycle classifier (no
|
||||
# transitions). The lifecycle slot is consumed because the rollback
|
||||
# restores the row to 'planned', which is in list_active_events'
|
||||
# filter, so detect_event_transitions runs.
|
||||
state_canned = json.dumps(
|
||||
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
|
||||
)
|
||||
no_transitions = json.dumps({"transitions": []})
|
||||
mock_client = MockLLMClient(
|
||||
canned=["Refreshed reply.", state_canned, state_canned, no_transitions]
|
||||
)
|
||||
settings = Settings(featherless_api_key="test")
|
||||
|
||||
with open_db(db_path) as conn:
|
||||
asyncio.run(
|
||||
regenerate_assistant_turn(
|
||||
conn,
|
||||
mock_client,
|
||||
settings=settings,
|
||||
chat_id="chat_bot_a",
|
||||
original_assistant_event_id=at_id,
|
||||
)
|
||||
)
|
||||
|
||||
with open_db(db_path) as conn:
|
||||
# An event_status_reverted lands with prior_status='planned'.
|
||||
rev_rows = conn.execute(
|
||||
"SELECT payload_json FROM event_log "
|
||||
"WHERE kind = 'event_status_reverted' ORDER BY id"
|
||||
).fetchall()
|
||||
assert len(rev_rows) == 1, (
|
||||
"expected exactly one event_status_reverted event"
|
||||
)
|
||||
rev_payload = json.loads(rev_rows[0][0])
|
||||
assert rev_payload["event_id"] == "evt_started"
|
||||
assert rev_payload["prior_status"] == "planned"
|
||||
|
||||
# Events projection: status is back to 'planned'.
|
||||
status = conn.execute(
|
||||
"SELECT status FROM events WHERE event_id = ?",
|
||||
("evt_started",),
|
||||
).fetchone()[0]
|
||||
assert status == "planned"
|
||||
|
||||
|
||||
def test_regenerate_rolls_back_event_completed_to_active(tmp_path, monkeypatch):
|
||||
"""T114.3: a completed event whose completion was triggered by the
|
||||
superseded turn rolls back to 'active'. Mirrors the started→planned
|
||||
case but exercises the 'completed → active' branch of
|
||||
``_PRIOR_STATUS_MAP`` in regenerate.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
from chat.config import Settings
|
||||
from chat.db.migrate import apply_migrations
|
||||
from chat.services.regenerate import regenerate_assistant_turn
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
cfg = tmp_path / "config.toml"
|
||||
cfg.write_text('featherless_api_key = "test"\n')
|
||||
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
|
||||
monkeypatch.setenv("CHAT_DB_PATH", str(db_path))
|
||||
apply_migrations(db_path)
|
||||
|
||||
_ut_id, at_id = _seed_with_one_turn(db_path)
|
||||
# The forward sequence here pretends the prior turn ALSO authored
|
||||
# the start (which is realistic — a single turn flow could go
|
||||
# planned → active → completed across multiple events). Tagging
|
||||
# both with the same back-reference exercises the multi-rollback
|
||||
# loop (one per affected lifecycle row).
|
||||
_seed_event_with_lifecycle(
|
||||
db_path,
|
||||
event_id="evt_completed",
|
||||
triggered_by_assistant_turn_id=at_id,
|
||||
forward_kinds=["event_started", "event_completed"],
|
||||
)
|
||||
|
||||
# Sanity: events row is 'completed'.
|
||||
with open_db(db_path) as conn:
|
||||
status = conn.execute(
|
||||
"SELECT status FROM events WHERE event_id = ?", ("evt_completed",)
|
||||
).fetchone()[0]
|
||||
assert status == "completed"
|
||||
|
||||
state_canned = json.dumps(
|
||||
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
|
||||
)
|
||||
no_transitions = json.dumps({"transitions": []})
|
||||
mock_client = MockLLMClient(
|
||||
canned=["Refreshed reply.", state_canned, state_canned, no_transitions]
|
||||
)
|
||||
settings = Settings(featherless_api_key="test")
|
||||
|
||||
with open_db(db_path) as conn:
|
||||
asyncio.run(
|
||||
regenerate_assistant_turn(
|
||||
conn,
|
||||
mock_client,
|
||||
settings=settings,
|
||||
chat_id="chat_bot_a",
|
||||
original_assistant_event_id=at_id,
|
||||
)
|
||||
)
|
||||
|
||||
with open_db(db_path) as conn:
|
||||
# Two event_status_reverted rows land — one per forward
|
||||
# transition that carried the back-reference. Both target the
|
||||
# same event_id but with different prior_status values
|
||||
# (in event_log id order: started→planned, completed→active).
|
||||
rev_rows = conn.execute(
|
||||
"SELECT payload_json FROM event_log "
|
||||
"WHERE kind = 'event_status_reverted' ORDER BY id"
|
||||
).fetchall()
|
||||
assert len(rev_rows) == 2
|
||||
rev_payloads = [json.loads(r[0]) for r in rev_rows]
|
||||
assert rev_payloads[0] == {
|
||||
"event_id": "evt_completed",
|
||||
"prior_status": "planned",
|
||||
}
|
||||
assert rev_payloads[1] == {
|
||||
"event_id": "evt_completed",
|
||||
"prior_status": "active",
|
||||
}
|
||||
|
||||
# Events projection: the LAST applied event_status_reverted
|
||||
# wins (active). That's the desired final state for a turn
|
||||
# that was originally a started+completed double-step.
|
||||
status = conn.execute(
|
||||
"SELECT status FROM events WHERE event_id = ?",
|
||||
("evt_completed",),
|
||||
).fetchone()[0]
|
||||
assert status == "active"
|
||||
|
||||
|
||||
def test_regenerate_skips_events_without_back_reference(
|
||||
tmp_path, monkeypatch, caplog
|
||||
):
|
||||
"""T114.3 backward compatibility: lifecycle events authored before
|
||||
T114.1 lack the ``triggered_by_assistant_turn_id`` payload field.
|
||||
Regenerate must NOT emit ``event_status_reverted`` for such rows —
|
||||
they're skipped (with a DEBUG log). The legacy T83.4 WARNING about
|
||||
un-rolled-back transitions still fires for visibility.
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from chat.config import Settings
|
||||
from chat.db.migrate import apply_migrations
|
||||
from chat.eventlog.log import append_and_apply
|
||||
from chat.services.regenerate import regenerate_assistant_turn
|
||||
|
||||
db_path = tmp_path / "test.db"
|
||||
cfg = tmp_path / "config.toml"
|
||||
cfg.write_text('featherless_api_key = "test"\n')
|
||||
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
|
||||
monkeypatch.setenv("CHAT_DB_PATH", str(db_path))
|
||||
apply_migrations(db_path)
|
||||
|
||||
_ut_id, at_id = _seed_with_one_turn(db_path)
|
||||
|
||||
# Seed a lifecycle transition WITHOUT the back-reference field —
|
||||
# mimicking pre-T114.1 event_log rows.
|
||||
with open_db(db_path) as conn:
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="event_planned",
|
||||
payload={
|
||||
"event_id": "evt_legacy",
|
||||
"chat_id": "chat_bot_a",
|
||||
"kind": "story_event",
|
||||
"props": {},
|
||||
"planned_for": "2026-04-30T18:00:00+00:00",
|
||||
},
|
||||
)
|
||||
append_and_apply(
|
||||
conn,
|
||||
kind="event_started",
|
||||
payload={
|
||||
"event_id": "evt_legacy",
|
||||
"started_at": "2026-04-30T19:00:00+00:00",
|
||||
# NOTE: no triggered_by_assistant_turn_id — pre-T114.1
|
||||
# legacy row.
|
||||
},
|
||||
)
|
||||
|
||||
state_canned = json.dumps(
|
||||
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
|
||||
)
|
||||
no_transitions = json.dumps({"transitions": []})
|
||||
mock_client = MockLLMClient(
|
||||
canned=["Refreshed reply.", state_canned, state_canned, no_transitions]
|
||||
)
|
||||
settings = Settings(featherless_api_key="test")
|
||||
|
||||
caplog.set_level(logging.DEBUG, logger="chat.services.regenerate")
|
||||
|
||||
with open_db(db_path) as conn:
|
||||
asyncio.run(
|
||||
regenerate_assistant_turn(
|
||||
conn,
|
||||
mock_client,
|
||||
settings=settings,
|
||||
chat_id="chat_bot_a",
|
||||
original_assistant_event_id=at_id,
|
||||
)
|
||||
)
|
||||
|
||||
with open_db(db_path) as conn:
|
||||
# No event_status_reverted was emitted for the legacy row.
|
||||
rev_count = conn.execute(
|
||||
"SELECT COUNT(*) FROM event_log "
|
||||
"WHERE kind = 'event_status_reverted'"
|
||||
).fetchone()[0]
|
||||
assert rev_count == 0
|
||||
|
||||
# Events row is still 'active' — the legacy transition stands.
|
||||
status = conn.execute(
|
||||
"SELECT status FROM events WHERE event_id = ?",
|
||||
("evt_legacy",),
|
||||
).fetchone()[0]
|
||||
assert status == "active"
|
||||
|
||||
# Debug log surfaces the skipped row.
|
||||
debugs = [
|
||||
r.getMessage()
|
||||
for r in caplog.records
|
||||
if r.levelname == "DEBUG"
|
||||
]
|
||||
assert any(
|
||||
"skipping rollback for lifecycle event_log" in m for m in debugs
|
||||
), f"expected DEBUG about skipped legacy row; got: {debugs}"
|
||||
|
||||
# Legacy WARNING still fires so operators see un-rolled-back rows.
|
||||
warnings = [
|
||||
r.getMessage()
|
||||
for r in caplog.records
|
||||
if r.levelname == "WARNING"
|
||||
and "lifecycle transition" in r.getMessage()
|
||||
]
|
||||
assert warnings, (
|
||||
"expected WARNING about un-rolled-back legacy lifecycle "
|
||||
f"transitions; got records: "
|
||||
f"{[r.getMessage() for r in caplog.records]}"
|
||||
)
|
||||
# The new wording references the missing back-reference field.
|
||||
assert "triggered_by_assistant_turn_id" in warnings[0]
|
||||
|
||||
@@ -1023,6 +1023,18 @@ def test_turn_with_event_transition_appends_started_event(
|
||||
assert started_payload["event_id"] == "evt_1"
|
||||
assert started_payload["started_at"] == "2026-04-26T20:00:00+00:00"
|
||||
|
||||
# T114.1: payload carries the back-reference to the assistant_turn
|
||||
# that triggered the transition. The assistant_turn lands in
|
||||
# event_log immediately before the event_started, so its id is
|
||||
# the largest assistant_turn id in the chat at this point.
|
||||
at_id = conn.execute(
|
||||
"SELECT id FROM event_log "
|
||||
"WHERE kind = 'assistant_turn' "
|
||||
" AND json_extract(payload_json, '$.chat_id') = 'chat_bot_a' "
|
||||
"ORDER BY id DESC LIMIT 1"
|
||||
).fetchone()[0]
|
||||
assert started_payload["triggered_by_assistant_turn_id"] == at_id
|
||||
|
||||
# The events projection row reflects the active status.
|
||||
ev_row = conn.execute(
|
||||
"SELECT status, started_at FROM events WHERE event_id = ?",
|
||||
|
||||
Reference in New Issue
Block a user