From 7370f68bdfd031ae9bf27311c4a278bede025ccc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 27 Apr 2026 06:38:48 -0400 Subject: [PATCH 1/3] feat: lifecycle events carry triggered_by_assistant_turn_id back-reference (T114.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3.5 T83.4 surfaced un-rolled-back lifecycle transitions on regenerate; T114 wires up the actual rollback. Step 1 is the back- reference: every event_started / event_completed / event_cancelled emitted by post_turn (chat/web/turns.py) and regenerate (chat/services/regenerate.py) now carries ``triggered_by_assistant_turn_id`` in its payload, set to the id of the assistant_turn event that produced the transition. Schema decision (Option A from the plan): no migration. The field is a payload convention only — older event_log rows lack it and rollback will skip them with a debug log when T114.3 lands. Forward-only. The post_turn lifecycle block already runs AFTER the assistant_turn event is appended (step 8a vs step 7), so ``primary_assistant_event_id`` is in scope. Same for regenerate: the lifecycle classification (step 9a) runs after step 6's append. **No emission-order reorder was needed** in either flow. Updates ``test_turn_with_event_transition_appends_started_event`` to assert the new field is present in the emitted event_started payload and points at the assistant_turn id. --- chat/services/regenerate.py | 14 ++++++++++++++ chat/web/turns.py | 16 ++++++++++++++++ tests/test_turn_flow.py | 12 ++++++++++++ 3 files changed, 42 insertions(+) diff --git a/chat/services/regenerate.py b/chat/services/regenerate.py index 6442bb2..bceaf16 100644 --- a/chat/services/regenerate.py +++ b/chat/services/regenerate.py @@ -738,6 +738,12 @@ async def regenerate_assistant_turn( payload={ "event_id": transition.event_id, "started_at": chat.get("time"), + # T114.1: back-reference to the assistant_turn + # that triggered this transition (see turns.py + # for rationale). + "triggered_by_assistant_turn_id": ( + new_assistant_event_id + ), }, ) elif transition.new_status == "completed": @@ -747,6 +753,10 @@ async def regenerate_assistant_turn( payload={ "event_id": transition.event_id, "completed_at": chat.get("time"), + # T114.1: back-reference (see above). + "triggered_by_assistant_turn_id": ( + new_assistant_event_id + ), }, ) promote_completed_event( @@ -762,6 +772,10 @@ async def regenerate_assistant_turn( payload={ "event_id": transition.event_id, "completed_at": chat.get("time"), + # T114.1: back-reference (see above). + "triggered_by_assistant_turn_id": ( + new_assistant_event_id + ), }, ) diff --git a/chat/web/turns.py b/chat/web/turns.py index dfb4b21..623390d 100644 --- a/chat/web/turns.py +++ b/chat/web/turns.py @@ -812,6 +812,14 @@ async def post_turn( payload={ "event_id": transition.event_id, "started_at": chat.get("time"), + # T114.1: back-reference to the assistant_turn that + # triggered this transition. Regenerate uses this + # to roll back lifecycle transitions when the turn + # is superseded. Forward-only — older events + # without this field are skipped by rollback. + "triggered_by_assistant_turn_id": ( + primary_assistant_event_id + ), }, ) elif transition.new_status == "completed": @@ -821,6 +829,10 @@ async def post_turn( payload={ "event_id": transition.event_id, "completed_at": chat.get("time"), + # T114.1: back-reference (see above). + "triggered_by_assistant_turn_id": ( + primary_assistant_event_id + ), }, ) # Run promotion inline so the artifact-emitting events @@ -842,6 +854,10 @@ async def post_turn( payload={ "event_id": transition.event_id, "completed_at": chat.get("time"), + # T114.1: back-reference (see above). + "triggered_by_assistant_turn_id": ( + primary_assistant_event_id + ), }, ) # Any other ``new_status`` value falls through silently — diff --git a/tests/test_turn_flow.py b/tests/test_turn_flow.py index 9d3fd0f..50209cb 100644 --- a/tests/test_turn_flow.py +++ b/tests/test_turn_flow.py @@ -1023,6 +1023,18 @@ def test_turn_with_event_transition_appends_started_event( assert started_payload["event_id"] == "evt_1" assert started_payload["started_at"] == "2026-04-26T20:00:00+00:00" + # T114.1: payload carries the back-reference to the assistant_turn + # that triggered the transition. The assistant_turn lands in + # event_log immediately before the event_started, so its id is + # the largest assistant_turn id in the chat at this point. + at_id = conn.execute( + "SELECT id FROM event_log " + "WHERE kind = 'assistant_turn' " + " AND json_extract(payload_json, '$.chat_id') = 'chat_bot_a' " + "ORDER BY id DESC LIMIT 1" + ).fetchone()[0] + assert started_payload["triggered_by_assistant_turn_id"] == at_id + # The events projection row reflects the active status. ev_row = conn.execute( "SELECT status, started_at FROM events WHERE event_id = ?", From 6d4ad86e3375b888587827b1e5cf06d162761493 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 27 Apr 2026 06:39:03 -0400 Subject: [PATCH 2/3] feat: event_status_reverted event kind + projector handler (T114.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the inverse projection used by T114.3's regenerate rollback. The new ``event_status_reverted`` event kind carries ``{event_id, prior_status}`` and the handler unconditionally sets ``events.status = prior_status`` for the row. Unlike the forward transitions (event_started / event_completed / event_cancelled), this handler does NOT guard against terminal statuses — its entire purpose is to reverse a transition, including walking back from a terminal status to a non-terminal one. Without that, rolling back an event_completed (status='completed' is terminal for the forward handlers) would silently no-op and leave the row in the post-superseded state. The handler registers via the existing ``@on(kind)`` decorator pattern in chat/eventlog/projector.py, so future replays of an event_log that contains event_status_reverted rows pick it up automatically. Test exercises completed→active, active→planned, and cancelled→active round-trips. --- chat/state/events.py | 23 ++++++++++ tests/test_events_state.py | 88 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/chat/state/events.py b/chat/state/events.py index b2f0b1c..13b2424 100644 --- a/chat/state/events.py +++ b/chat/state/events.py @@ -67,6 +67,29 @@ def _apply_event_expired(conn: Connection, e: Event) -> None: ) +@on("event_status_reverted") +def _apply_event_status_reverted(conn: Connection, e: Event) -> None: + """T114.2: Revert an event row's status to ``prior_status``. + + Emitted by ``regenerate_assistant_turn`` when a superseded turn had + triggered a lifecycle transition (event_started / event_completed / + event_cancelled). The rollback step needs an inverse projection that + sets the row's status back to whatever it was *before* the now- + superseded transition fired. + + Unlike the forward transitions (which guard against terminal-status + overwrites) this handler is unconditional — the entire purpose is to + reverse a transition, including reverting from a terminal status + (completed/cancelled) back to a non-terminal one. + """ + p = e.payload + conn.execute( + "UPDATE events SET status = ?, updated_at = datetime('now') " + "WHERE event_id = ?", + (p["prior_status"], p["event_id"]), + ) + + def get_event(conn: Connection, event_id: str) -> dict | None: row = conn.execute( "SELECT event_id, chat_id, kind, status, props_json, planned_for, " diff --git a/tests/test_events_state.py b/tests/test_events_state.py index 6ced284..6259bc0 100644 --- a/tests/test_events_state.py +++ b/tests/test_events_state.py @@ -233,3 +233,91 @@ def test_list_active_events_filters_to_planned_and_active(tmp_path): cancelled = list_events_in_status(conn, "chat_bot_a", "cancelled") assert [e["event_id"] for e in cancelled] == ["evt_canx"] + + +def test_event_status_reverted_returns_to_prior_status(tmp_path): + """T114.2: ``event_status_reverted`` rolls a row back to ``prior_status``. + + Unlike the forward transitions, this projector handler is + unconditional — its sole purpose is to undo a transition, including + reverting from a terminal status (completed/cancelled) back to a + non-terminal one. + + Three round-trips covered: + - completed → active (rollback of an event_completed) + - active → planned (rollback of an event_started) + - cancelled → active (rollback of an event_cancelled) + """ + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_chat(conn) + append_event( + conn, + kind="event_planned", + payload={ + "event_id": "evt_revert", + "chat_id": "chat_bot_a", + "kind": "date_at_park", + "props": {}, + "planned_for": "2026-04-30T18:00:00+00:00", + }, + ) + append_event( + conn, + kind="event_started", + payload={ + "event_id": "evt_revert", + "started_at": "2026-04-30T18:01:00+00:00", + }, + ) + append_event( + conn, + kind="event_completed", + payload={ + "event_id": "evt_revert", + "completed_at": "2026-04-30T20:00:00+00:00", + }, + ) + project(conn) + + ev = get_event(conn, "evt_revert") + assert ev is not None + assert ev["status"] == "completed" + + # Revert from completed → active. + append_and_apply( + conn, + kind="event_status_reverted", + payload={"event_id": "evt_revert", "prior_status": "active"}, + ) + ev = get_event(conn, "evt_revert") + assert ev["status"] == "active" + + # Revert from active → planned. + append_and_apply( + conn, + kind="event_status_reverted", + payload={"event_id": "evt_revert", "prior_status": "planned"}, + ) + ev = get_event(conn, "evt_revert") + assert ev["status"] == "planned" + + # Forward to cancelled, then revert from cancelled → active. + append_and_apply( + conn, + kind="event_cancelled", + payload={ + "event_id": "evt_revert", + "completed_at": "2026-04-30T20:30:00+00:00", + }, + ) + ev = get_event(conn, "evt_revert") + assert ev["status"] == "cancelled" + append_and_apply( + conn, + kind="event_status_reverted", + payload={"event_id": "evt_revert", "prior_status": "active"}, + ) + ev = get_event(conn, "evt_revert") + assert ev["status"] == "active" From 80ce891bd87816f1b10ba900e7b1906fd5b3111b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 27 Apr 2026 06:45:43 -0400 Subject: [PATCH 3/3] feat: regenerate rolls back lifecycle transitions on supersede (T114.3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the T83.4 gap: when ``regenerate_assistant_turn`` supersedes an assistant_turn that already produced lifecycle transitions, it now emits an ``event_status_reverted`` (T114.2) for each transition tagged with ``triggered_by_assistant_turn_id == original_assistant_event_id`` (T114.1 back-reference) before the regenerated narrative is reclassified. Mapping from forward kind to ``prior_status`` lives in ``_PRIOR_STATUS_MAP``: - event_started → planned - event_completed → active - event_cancelled → active (best-effort default; cancellation can fire from either planned or active, but detect_event_transitions only surfaces currently-active rows so 'active' is the realistic prior) Backward compatibility: lifecycle rows authored before T114.1 lack the back-reference field. Those are skipped (DEBUG log per row) and collected into a legacy WARNING that preserves the T83.4 observability contract — operators still see un-rolled-back transitions, just from older logs. The classify-and-emit pass below the rollback now operates against an events projection that has already been reverted, so re-firing ``event_started``/``event_completed``/``event_cancelled`` for the regenerated narrative is safe — no double-emit of promotion artifacts. Spec tests: - ``test_regenerate_rolls_back_event_started_from_superseded_turn`` - ``test_regenerate_rolls_back_event_completed_to_active`` (also exercises the multi-rollback loop: a turn that fired both a start and a completion gets two event_status_reverted rows in id order, with active as the final projection — matching the per-row replay semantics of the projector) - ``test_regenerate_skips_events_without_back_reference`` (pins the legacy compatibility path with both DEBUG and WARNING expectations) --- chat/services/regenerate.py | 152 ++++++++++++---- tests/test_regenerate.py | 343 ++++++++++++++++++++++++++++++++++++ 2 files changed, 459 insertions(+), 36 deletions(-) diff --git a/chat/services/regenerate.py b/chat/services/regenerate.py index bceaf16..de88049 100644 --- a/chat/services/regenerate.py +++ b/chat/services/regenerate.py @@ -95,6 +95,27 @@ from chat.web.render import render_turn_html _log = logging.getLogger(__name__) +# T114.3: map a lifecycle-transition event kind to the events-table +# status it implicitly transitioned *from*. Regenerate uses this to pick +# the ``prior_status`` value for the ``event_status_reverted`` rollback +# event so the projector sets the row back to where it was before the +# superseded turn fired the transition. +# +# - ``event_started`` was emitted when the row was 'planned' → revert to +# 'planned'. +# - ``event_completed`` was emitted when the row was 'active' → revert +# to 'active'. +# - ``event_cancelled`` could have fired from either 'planned' or +# 'active'. Best-effort default: 'active'. The forward transitions +# below only fire detect_event_transitions for currently-active rows, +# so 'active' is the realistic prior in practice. +_PRIOR_STATUS_MAP: dict[str, str] = { + "event_started": "planned", + "event_completed": "active", + "event_cancelled": "active", +} + + async def regenerate_assistant_turn( conn: Connection, client, @@ -115,17 +136,18 @@ async def regenerate_assistant_turn( cannot be found — the FastAPI route translates this to 404. .. note:: - **Lifecycle-rollback limitation (T83.4, Phase 4 follow-up).** + **Lifecycle rollback (T114, Phase 4.5).** When the superseded turn already produced lifecycle transitions (``event_started`` / ``event_completed`` / ``event_cancelled``), - this function does NOT roll those rows back before re-running - ``detect_event_transitions`` against the regenerated text. A - regenerate-after-completion can therefore double-emit promotion - artifacts if the new text re-completes the same event. Phase 3.5 - only documents the gap and emits a WARNING log naming the - affected event_log ids; the actual undo pass is invasive - (re-projection / inverse-handler dispatch) and is deferred to - Phase 4. See the ``# T83.4`` block below for the warning emit. + this function emits an ``event_status_reverted`` event for each + so the events row's status returns to its prior value before the + regenerated narrative is reclassified. Backward compatibility: + lifecycle events authored before T114.1 lack the + ``triggered_by_assistant_turn_id`` payload field; rollback skips + those (logged at DEBUG) so historic rows are not retroactively + reverted. A WARNING about un-rolled-back transitions is still + emitted when stragglers are found — the rollback handles the + common case while older logs continue to need manual review. """ chat = get_chat(conn, chat_id) if chat is None: @@ -158,20 +180,21 @@ async def regenerate_assistant_turn( original_assistant_payload = json.loads(row[0]) original_user_turn_id = original_assistant_payload.get("user_turn_id") - # T83.4: scan for downstream lifecycle transitions emitted by the - # superseded turn — they're not being rolled back (see method - # docstring). Heuristic: any ``event_started`` / ``event_completed`` - # / ``event_cancelled`` event_log row with id strictly greater than - # the original assistant_turn's id was emitted as part of (or after) - # that turn's processing. Lifecycle events don't carry ``chat_id`` - # in their payload (their payload references an ``event_id`` FK to - # the ``events`` table, which holds chat_id), so we join through - # ``events`` to scope to this chat. - # - # A WARNING log surfaces the affected event ids so operators can - # spot double-emit cases until the Phase 4 rollback pass lands. + # T114.3: roll back lifecycle transitions emitted by the superseded + # turn. The scan uses the same id-greater-than-superseded-turn + # heuristic as the legacy T83.4 warning, joined to ``events`` for + # chat scoping (lifecycle events don't carry chat_id in their + # payload — they reference an ``event_id`` FK to the ``events`` + # table, which holds chat_id). For each row whose payload carries + # ``triggered_by_assistant_turn_id == original_assistant_event_id`` + # (T114.1 back-reference), emit an ``event_status_reverted`` event + # so the events-row status returns to the pre-transition value. + # Lifecycle rows authored before T114.1 lack the back-reference; + # those are skipped (DEBUG log) and a WARNING tracks their count so + # operators still see legacy stragglers — preserves the T83.4 + # observability contract for un-rolled-back transitions. unrolled_lifecycle = conn.execute( - "SELECT el.id, el.kind FROM event_log AS el " + "SELECT el.id, el.kind, el.payload_json FROM event_log AS el " "JOIN events AS ev " " ON ev.event_id = json_extract(el.payload_json, '$.event_id') " "WHERE el.kind IN (" @@ -182,18 +205,73 @@ async def regenerate_assistant_turn( "ORDER BY el.id ASC", (chat_id, original_assistant_event_id), ).fetchall() - if unrolled_lifecycle: - # T90.2: phrased as "at-or-after turn " rather than "from - # superseded turn" because regenerating an OLDER turn lists - # intervening-turn transitions that legitimately stand on their - # own — those weren't authored by the superseded turn itself. + rolled_back_ids: list[int] = [] + skipped_no_backref: list[int] = [] + for el_id, el_kind, el_payload_json in unrolled_lifecycle: + try: + lifecycle_payload = json.loads(el_payload_json) + except (TypeError, ValueError): + skipped_no_backref.append(el_id) + continue + triggered_by = lifecycle_payload.get("triggered_by_assistant_turn_id") + if triggered_by != original_assistant_event_id: + # Either a legacy row (no field) or a transition triggered + # by a *different* turn — leave it alone. DEBUG so the + # message is available under verbose logging without + # spamming the default WARNING channel. + _log.debug( + "regenerate_assistant_turn: skipping rollback for " + "lifecycle event_log id=%d (kind=%s) — no back-reference " + "or different turn (triggered_by=%r vs superseded=%d)", + el_id, + el_kind, + triggered_by, + original_assistant_event_id, + ) + if triggered_by is None: + skipped_no_backref.append(el_id) + continue + prior_status = _PRIOR_STATUS_MAP.get(el_kind) + if prior_status is None: + # Defensive: the SQL filter already restricts to the three + # known kinds, but a future schema addition shouldn't crash + # the rollback path. + continue + target_event_id = lifecycle_payload.get("event_id") + if target_event_id is None: + continue + append_and_apply( + conn, + kind="event_status_reverted", + payload={ + "event_id": target_event_id, + "prior_status": prior_status, + }, + ) + rolled_back_ids.append(el_id) + if rolled_back_ids: + _log.info( + "regenerate_assistant_turn: rolled back %d lifecycle " + "transition(s) triggered by superseded turn %s " + "(event_log ids: %s)", + len(rolled_back_ids), + original_assistant_event_id, + rolled_back_ids, + ) + if skipped_no_backref: + # T83.4 (legacy) compatibility: still warn about stragglers + # without the back-reference so operators can spot pre-T114 + # double-emit risks. Phrased as "at-or-after turn " per + # T90.2 — older transitions may legitimately belong to other + # turns. _log.warning( "regenerate_assistant_turn: %d lifecycle transition(s) " - "at-or-after turn %s are NOT being rolled back (Phase 4 " - "follow-up). Affected event ids: %s", - len(unrolled_lifecycle), + "at-or-after turn %s are NOT being rolled back (no " + "triggered_by_assistant_turn_id back-reference). " + "Affected event ids: %s", + len(skipped_no_backref), original_assistant_event_id, - [r[0] for r in unrolled_lifecycle], + skipped_no_backref, ) # 1a. Look up any sibling interjection beat in the same turn group @@ -716,11 +794,13 @@ async def regenerate_assistant_turn( # runs inline after a completion so promotion artifacts land in the # same regenerate path. # - # T83.4 follow-up: when a regenerate replaces a turn that had - # already produced event transitions, those original transitions - # are NOT undone here (Phase 4 work). A WARNING log earlier in this - # function names the affected event_log ids — see the T83.4 block - # near the function entry. + # T114.3: original-turn transitions emitted before this regenerate + # ran were rolled back at the top of the function (see the + # ``# T114.3`` block) by appending ``event_status_reverted`` for + # each. The classify-and-emit pass below now operates against an + # ``events`` projection that has already been reverted, so it can + # safely re-fire transitions for the regenerated narrative without + # double-emitting promotion artifacts. new_active_events = list_active_events(conn, chat_id) if new_active_events: lifecycle_decision = await detect_event_transitions( diff --git a/tests/test_regenerate.py b/tests/test_regenerate.py index b6d5e92..88e7422 100644 --- a/tests/test_regenerate.py +++ b/tests/test_regenerate.py @@ -1022,3 +1022,346 @@ def test_regenerate_registers_task_in_in_flight_tasks(tmp_path, monkeypatch): assert isinstance(in_flight_snapshot.get("task"), asyncio.Task) # Post-flight: the entry has been cleaned up. assert "chat_bot_a" not in _in_flight_tasks + + +# --------------------------------------------------------------------------- +# T114: lifecycle rollback. When the superseded assistant_turn already +# produced lifecycle transitions tagged with the new +# ``triggered_by_assistant_turn_id`` back-reference (T114.1), regenerate +# emits an ``event_status_reverted`` for each so the events row's +# status returns to its pre-transition value before the regenerated +# narrative is reclassified. Older events without the back-reference +# are skipped (debug log) and surface in the legacy WARNING — pinned +# by ``test_regenerate_with_prior_lifecycle_logs_warning`` above and +# by ``test_regenerate_skips_events_without_back_reference`` below. +# --------------------------------------------------------------------------- + + +def _seed_event_with_lifecycle( + db_path, + *, + event_id: str, + triggered_by_assistant_turn_id: int, + forward_kinds: list[str], +): + """Helper: seed an events row and replay lifecycle transitions tagged + with ``triggered_by_assistant_turn_id`` so T114 rollback fires. + + ``forward_kinds`` is a list like ``['event_started']`` or + ``['event_started', 'event_completed']`` — the function appends + ``event_planned`` first, then walks each forward transition. + """ + from chat.eventlog.log import append_and_apply + + with open_db(db_path) as conn: + append_and_apply( + conn, + kind="event_planned", + payload={ + "event_id": event_id, + "chat_id": "chat_bot_a", + "kind": "story_event", + "props": {}, + "planned_for": "2026-04-30T18:00:00+00:00", + }, + ) + for kind in forward_kinds: + payload: dict = { + "event_id": event_id, + "triggered_by_assistant_turn_id": ( + triggered_by_assistant_turn_id + ), + } + if kind == "event_started": + payload["started_at"] = "2026-04-30T19:00:00+00:00" + else: + payload["completed_at"] = "2026-04-30T19:30:00+00:00" + append_and_apply(conn, kind=kind, payload=payload) + + +def test_regenerate_rolls_back_event_started_from_superseded_turn( + tmp_path, monkeypatch +): + """T114.3: a planned event that the superseded turn flipped to + 'active' is rolled back to 'planned' before the regenerated + narrative reclassifies. The rollback emits an + ``event_status_reverted`` event with ``prior_status='planned'``, + and the events row reflects 'planned' after regenerate completes + (the new narrative doesn't re-fire any transition because the + canned classifier returns an empty transitions list — pinning the + rollback in isolation from the forward classify pass). + """ + import asyncio + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.services.regenerate import regenerate_assistant_turn + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + _ut_id, at_id = _seed_with_one_turn(db_path) + _seed_event_with_lifecycle( + db_path, + event_id="evt_started", + triggered_by_assistant_turn_id=at_id, + forward_kinds=["event_started"], + ) + + # Sanity: events row is currently 'active'. + with open_db(db_path) as conn: + status = conn.execute( + "SELECT status FROM events WHERE event_id = ?", ("evt_started",) + ).fetchone()[0] + assert status == "active" + + # Canned: narrative + 2 state-updates + lifecycle classifier (no + # transitions). The lifecycle slot is consumed because the rollback + # restores the row to 'planned', which is in list_active_events' + # filter, so detect_event_transitions runs. + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + no_transitions = json.dumps({"transitions": []}) + mock_client = MockLLMClient( + canned=["Refreshed reply.", state_canned, state_canned, no_transitions] + ) + settings = Settings(featherless_api_key="test") + + with open_db(db_path) as conn: + asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_bot_a", + original_assistant_event_id=at_id, + ) + ) + + with open_db(db_path) as conn: + # An event_status_reverted lands with prior_status='planned'. + rev_rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'event_status_reverted' ORDER BY id" + ).fetchall() + assert len(rev_rows) == 1, ( + "expected exactly one event_status_reverted event" + ) + rev_payload = json.loads(rev_rows[0][0]) + assert rev_payload["event_id"] == "evt_started" + assert rev_payload["prior_status"] == "planned" + + # Events projection: status is back to 'planned'. + status = conn.execute( + "SELECT status FROM events WHERE event_id = ?", + ("evt_started",), + ).fetchone()[0] + assert status == "planned" + + +def test_regenerate_rolls_back_event_completed_to_active(tmp_path, monkeypatch): + """T114.3: a completed event whose completion was triggered by the + superseded turn rolls back to 'active'. Mirrors the started→planned + case but exercises the 'completed → active' branch of + ``_PRIOR_STATUS_MAP`` in regenerate. + """ + import asyncio + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.services.regenerate import regenerate_assistant_turn + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + _ut_id, at_id = _seed_with_one_turn(db_path) + # The forward sequence here pretends the prior turn ALSO authored + # the start (which is realistic — a single turn flow could go + # planned → active → completed across multiple events). Tagging + # both with the same back-reference exercises the multi-rollback + # loop (one per affected lifecycle row). + _seed_event_with_lifecycle( + db_path, + event_id="evt_completed", + triggered_by_assistant_turn_id=at_id, + forward_kinds=["event_started", "event_completed"], + ) + + # Sanity: events row is 'completed'. + with open_db(db_path) as conn: + status = conn.execute( + "SELECT status FROM events WHERE event_id = ?", ("evt_completed",) + ).fetchone()[0] + assert status == "completed" + + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + no_transitions = json.dumps({"transitions": []}) + mock_client = MockLLMClient( + canned=["Refreshed reply.", state_canned, state_canned, no_transitions] + ) + settings = Settings(featherless_api_key="test") + + with open_db(db_path) as conn: + asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_bot_a", + original_assistant_event_id=at_id, + ) + ) + + with open_db(db_path) as conn: + # Two event_status_reverted rows land — one per forward + # transition that carried the back-reference. Both target the + # same event_id but with different prior_status values + # (in event_log id order: started→planned, completed→active). + rev_rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'event_status_reverted' ORDER BY id" + ).fetchall() + assert len(rev_rows) == 2 + rev_payloads = [json.loads(r[0]) for r in rev_rows] + assert rev_payloads[0] == { + "event_id": "evt_completed", + "prior_status": "planned", + } + assert rev_payloads[1] == { + "event_id": "evt_completed", + "prior_status": "active", + } + + # Events projection: the LAST applied event_status_reverted + # wins (active). That's the desired final state for a turn + # that was originally a started+completed double-step. + status = conn.execute( + "SELECT status FROM events WHERE event_id = ?", + ("evt_completed",), + ).fetchone()[0] + assert status == "active" + + +def test_regenerate_skips_events_without_back_reference( + tmp_path, monkeypatch, caplog +): + """T114.3 backward compatibility: lifecycle events authored before + T114.1 lack the ``triggered_by_assistant_turn_id`` payload field. + Regenerate must NOT emit ``event_status_reverted`` for such rows — + they're skipped (with a DEBUG log). The legacy T83.4 WARNING about + un-rolled-back transitions still fires for visibility. + """ + import asyncio + import logging + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.eventlog.log import append_and_apply + from chat.services.regenerate import regenerate_assistant_turn + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + _ut_id, at_id = _seed_with_one_turn(db_path) + + # Seed a lifecycle transition WITHOUT the back-reference field — + # mimicking pre-T114.1 event_log rows. + with open_db(db_path) as conn: + append_and_apply( + conn, + kind="event_planned", + payload={ + "event_id": "evt_legacy", + "chat_id": "chat_bot_a", + "kind": "story_event", + "props": {}, + "planned_for": "2026-04-30T18:00:00+00:00", + }, + ) + append_and_apply( + conn, + kind="event_started", + payload={ + "event_id": "evt_legacy", + "started_at": "2026-04-30T19:00:00+00:00", + # NOTE: no triggered_by_assistant_turn_id — pre-T114.1 + # legacy row. + }, + ) + + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + no_transitions = json.dumps({"transitions": []}) + mock_client = MockLLMClient( + canned=["Refreshed reply.", state_canned, state_canned, no_transitions] + ) + settings = Settings(featherless_api_key="test") + + caplog.set_level(logging.DEBUG, logger="chat.services.regenerate") + + with open_db(db_path) as conn: + asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_bot_a", + original_assistant_event_id=at_id, + ) + ) + + with open_db(db_path) as conn: + # No event_status_reverted was emitted for the legacy row. + rev_count = conn.execute( + "SELECT COUNT(*) FROM event_log " + "WHERE kind = 'event_status_reverted'" + ).fetchone()[0] + assert rev_count == 0 + + # Events row is still 'active' — the legacy transition stands. + status = conn.execute( + "SELECT status FROM events WHERE event_id = ?", + ("evt_legacy",), + ).fetchone()[0] + assert status == "active" + + # Debug log surfaces the skipped row. + debugs = [ + r.getMessage() + for r in caplog.records + if r.levelname == "DEBUG" + ] + assert any( + "skipping rollback for lifecycle event_log" in m for m in debugs + ), f"expected DEBUG about skipped legacy row; got: {debugs}" + + # Legacy WARNING still fires so operators see un-rolled-back rows. + warnings = [ + r.getMessage() + for r in caplog.records + if r.levelname == "WARNING" + and "lifecycle transition" in r.getMessage() + ] + assert warnings, ( + "expected WARNING about un-rolled-back legacy lifecycle " + f"transitions; got records: " + f"{[r.getMessage() for r in caplog.records]}" + ) + # The new wording references the missing back-reference field. + assert "triggered_by_assistant_turn_id" in warnings[0]