diff --git a/chat/services/regenerate.py b/chat/services/regenerate.py index bceaf16..de88049 100644 --- a/chat/services/regenerate.py +++ b/chat/services/regenerate.py @@ -95,6 +95,27 @@ from chat.web.render import render_turn_html _log = logging.getLogger(__name__) +# T114.3: map a lifecycle-transition event kind to the events-table +# status it implicitly transitioned *from*. Regenerate uses this to pick +# the ``prior_status`` value for the ``event_status_reverted`` rollback +# event so the projector sets the row back to where it was before the +# superseded turn fired the transition. +# +# - ``event_started`` was emitted when the row was 'planned' → revert to +# 'planned'. +# - ``event_completed`` was emitted when the row was 'active' → revert +# to 'active'. +# - ``event_cancelled`` could have fired from either 'planned' or +# 'active'. Best-effort default: 'active'. The forward transitions +# below only fire detect_event_transitions for currently-active rows, +# so 'active' is the realistic prior in practice. +_PRIOR_STATUS_MAP: dict[str, str] = { + "event_started": "planned", + "event_completed": "active", + "event_cancelled": "active", +} + + async def regenerate_assistant_turn( conn: Connection, client, @@ -115,17 +136,18 @@ async def regenerate_assistant_turn( cannot be found — the FastAPI route translates this to 404. .. note:: - **Lifecycle-rollback limitation (T83.4, Phase 4 follow-up).** + **Lifecycle rollback (T114, Phase 4.5).** When the superseded turn already produced lifecycle transitions (``event_started`` / ``event_completed`` / ``event_cancelled``), - this function does NOT roll those rows back before re-running - ``detect_event_transitions`` against the regenerated text. A - regenerate-after-completion can therefore double-emit promotion - artifacts if the new text re-completes the same event. Phase 3.5 - only documents the gap and emits a WARNING log naming the - affected event_log ids; the actual undo pass is invasive - (re-projection / inverse-handler dispatch) and is deferred to - Phase 4. See the ``# T83.4`` block below for the warning emit. + this function emits an ``event_status_reverted`` event for each + so the events row's status returns to its prior value before the + regenerated narrative is reclassified. Backward compatibility: + lifecycle events authored before T114.1 lack the + ``triggered_by_assistant_turn_id`` payload field; rollback skips + those (logged at DEBUG) so historic rows are not retroactively + reverted. A WARNING about un-rolled-back transitions is still + emitted when stragglers are found — the rollback handles the + common case while older logs continue to need manual review. """ chat = get_chat(conn, chat_id) if chat is None: @@ -158,20 +180,21 @@ async def regenerate_assistant_turn( original_assistant_payload = json.loads(row[0]) original_user_turn_id = original_assistant_payload.get("user_turn_id") - # T83.4: scan for downstream lifecycle transitions emitted by the - # superseded turn — they're not being rolled back (see method - # docstring). Heuristic: any ``event_started`` / ``event_completed`` - # / ``event_cancelled`` event_log row with id strictly greater than - # the original assistant_turn's id was emitted as part of (or after) - # that turn's processing. Lifecycle events don't carry ``chat_id`` - # in their payload (their payload references an ``event_id`` FK to - # the ``events`` table, which holds chat_id), so we join through - # ``events`` to scope to this chat. - # - # A WARNING log surfaces the affected event ids so operators can - # spot double-emit cases until the Phase 4 rollback pass lands. + # T114.3: roll back lifecycle transitions emitted by the superseded + # turn. The scan uses the same id-greater-than-superseded-turn + # heuristic as the legacy T83.4 warning, joined to ``events`` for + # chat scoping (lifecycle events don't carry chat_id in their + # payload — they reference an ``event_id`` FK to the ``events`` + # table, which holds chat_id). For each row whose payload carries + # ``triggered_by_assistant_turn_id == original_assistant_event_id`` + # (T114.1 back-reference), emit an ``event_status_reverted`` event + # so the events-row status returns to the pre-transition value. + # Lifecycle rows authored before T114.1 lack the back-reference; + # those are skipped (DEBUG log) and a WARNING tracks their count so + # operators still see legacy stragglers — preserves the T83.4 + # observability contract for un-rolled-back transitions. unrolled_lifecycle = conn.execute( - "SELECT el.id, el.kind FROM event_log AS el " + "SELECT el.id, el.kind, el.payload_json FROM event_log AS el " "JOIN events AS ev " " ON ev.event_id = json_extract(el.payload_json, '$.event_id') " "WHERE el.kind IN (" @@ -182,18 +205,73 @@ async def regenerate_assistant_turn( "ORDER BY el.id ASC", (chat_id, original_assistant_event_id), ).fetchall() - if unrolled_lifecycle: - # T90.2: phrased as "at-or-after turn " rather than "from - # superseded turn" because regenerating an OLDER turn lists - # intervening-turn transitions that legitimately stand on their - # own — those weren't authored by the superseded turn itself. + rolled_back_ids: list[int] = [] + skipped_no_backref: list[int] = [] + for el_id, el_kind, el_payload_json in unrolled_lifecycle: + try: + lifecycle_payload = json.loads(el_payload_json) + except (TypeError, ValueError): + skipped_no_backref.append(el_id) + continue + triggered_by = lifecycle_payload.get("triggered_by_assistant_turn_id") + if triggered_by != original_assistant_event_id: + # Either a legacy row (no field) or a transition triggered + # by a *different* turn — leave it alone. DEBUG so the + # message is available under verbose logging without + # spamming the default WARNING channel. + _log.debug( + "regenerate_assistant_turn: skipping rollback for " + "lifecycle event_log id=%d (kind=%s) — no back-reference " + "or different turn (triggered_by=%r vs superseded=%d)", + el_id, + el_kind, + triggered_by, + original_assistant_event_id, + ) + if triggered_by is None: + skipped_no_backref.append(el_id) + continue + prior_status = _PRIOR_STATUS_MAP.get(el_kind) + if prior_status is None: + # Defensive: the SQL filter already restricts to the three + # known kinds, but a future schema addition shouldn't crash + # the rollback path. + continue + target_event_id = lifecycle_payload.get("event_id") + if target_event_id is None: + continue + append_and_apply( + conn, + kind="event_status_reverted", + payload={ + "event_id": target_event_id, + "prior_status": prior_status, + }, + ) + rolled_back_ids.append(el_id) + if rolled_back_ids: + _log.info( + "regenerate_assistant_turn: rolled back %d lifecycle " + "transition(s) triggered by superseded turn %s " + "(event_log ids: %s)", + len(rolled_back_ids), + original_assistant_event_id, + rolled_back_ids, + ) + if skipped_no_backref: + # T83.4 (legacy) compatibility: still warn about stragglers + # without the back-reference so operators can spot pre-T114 + # double-emit risks. Phrased as "at-or-after turn " per + # T90.2 — older transitions may legitimately belong to other + # turns. _log.warning( "regenerate_assistant_turn: %d lifecycle transition(s) " - "at-or-after turn %s are NOT being rolled back (Phase 4 " - "follow-up). Affected event ids: %s", - len(unrolled_lifecycle), + "at-or-after turn %s are NOT being rolled back (no " + "triggered_by_assistant_turn_id back-reference). " + "Affected event ids: %s", + len(skipped_no_backref), original_assistant_event_id, - [r[0] for r in unrolled_lifecycle], + skipped_no_backref, ) # 1a. Look up any sibling interjection beat in the same turn group @@ -716,11 +794,13 @@ async def regenerate_assistant_turn( # runs inline after a completion so promotion artifacts land in the # same regenerate path. # - # T83.4 follow-up: when a regenerate replaces a turn that had - # already produced event transitions, those original transitions - # are NOT undone here (Phase 4 work). A WARNING log earlier in this - # function names the affected event_log ids — see the T83.4 block - # near the function entry. + # T114.3: original-turn transitions emitted before this regenerate + # ran were rolled back at the top of the function (see the + # ``# T114.3`` block) by appending ``event_status_reverted`` for + # each. The classify-and-emit pass below now operates against an + # ``events`` projection that has already been reverted, so it can + # safely re-fire transitions for the regenerated narrative without + # double-emitting promotion artifacts. new_active_events = list_active_events(conn, chat_id) if new_active_events: lifecycle_decision = await detect_event_transitions( diff --git a/tests/test_regenerate.py b/tests/test_regenerate.py index b6d5e92..88e7422 100644 --- a/tests/test_regenerate.py +++ b/tests/test_regenerate.py @@ -1022,3 +1022,346 @@ def test_regenerate_registers_task_in_in_flight_tasks(tmp_path, monkeypatch): assert isinstance(in_flight_snapshot.get("task"), asyncio.Task) # Post-flight: the entry has been cleaned up. assert "chat_bot_a" not in _in_flight_tasks + + +# --------------------------------------------------------------------------- +# T114: lifecycle rollback. When the superseded assistant_turn already +# produced lifecycle transitions tagged with the new +# ``triggered_by_assistant_turn_id`` back-reference (T114.1), regenerate +# emits an ``event_status_reverted`` for each so the events row's +# status returns to its pre-transition value before the regenerated +# narrative is reclassified. Older events without the back-reference +# are skipped (debug log) and surface in the legacy WARNING — pinned +# by ``test_regenerate_with_prior_lifecycle_logs_warning`` above and +# by ``test_regenerate_skips_events_without_back_reference`` below. +# --------------------------------------------------------------------------- + + +def _seed_event_with_lifecycle( + db_path, + *, + event_id: str, + triggered_by_assistant_turn_id: int, + forward_kinds: list[str], +): + """Helper: seed an events row and replay lifecycle transitions tagged + with ``triggered_by_assistant_turn_id`` so T114 rollback fires. + + ``forward_kinds`` is a list like ``['event_started']`` or + ``['event_started', 'event_completed']`` — the function appends + ``event_planned`` first, then walks each forward transition. + """ + from chat.eventlog.log import append_and_apply + + with open_db(db_path) as conn: + append_and_apply( + conn, + kind="event_planned", + payload={ + "event_id": event_id, + "chat_id": "chat_bot_a", + "kind": "story_event", + "props": {}, + "planned_for": "2026-04-30T18:00:00+00:00", + }, + ) + for kind in forward_kinds: + payload: dict = { + "event_id": event_id, + "triggered_by_assistant_turn_id": ( + triggered_by_assistant_turn_id + ), + } + if kind == "event_started": + payload["started_at"] = "2026-04-30T19:00:00+00:00" + else: + payload["completed_at"] = "2026-04-30T19:30:00+00:00" + append_and_apply(conn, kind=kind, payload=payload) + + +def test_regenerate_rolls_back_event_started_from_superseded_turn( + tmp_path, monkeypatch +): + """T114.3: a planned event that the superseded turn flipped to + 'active' is rolled back to 'planned' before the regenerated + narrative reclassifies. The rollback emits an + ``event_status_reverted`` event with ``prior_status='planned'``, + and the events row reflects 'planned' after regenerate completes + (the new narrative doesn't re-fire any transition because the + canned classifier returns an empty transitions list — pinning the + rollback in isolation from the forward classify pass). + """ + import asyncio + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.services.regenerate import regenerate_assistant_turn + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + _ut_id, at_id = _seed_with_one_turn(db_path) + _seed_event_with_lifecycle( + db_path, + event_id="evt_started", + triggered_by_assistant_turn_id=at_id, + forward_kinds=["event_started"], + ) + + # Sanity: events row is currently 'active'. + with open_db(db_path) as conn: + status = conn.execute( + "SELECT status FROM events WHERE event_id = ?", ("evt_started",) + ).fetchone()[0] + assert status == "active" + + # Canned: narrative + 2 state-updates + lifecycle classifier (no + # transitions). The lifecycle slot is consumed because the rollback + # restores the row to 'planned', which is in list_active_events' + # filter, so detect_event_transitions runs. + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + no_transitions = json.dumps({"transitions": []}) + mock_client = MockLLMClient( + canned=["Refreshed reply.", state_canned, state_canned, no_transitions] + ) + settings = Settings(featherless_api_key="test") + + with open_db(db_path) as conn: + asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_bot_a", + original_assistant_event_id=at_id, + ) + ) + + with open_db(db_path) as conn: + # An event_status_reverted lands with prior_status='planned'. + rev_rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'event_status_reverted' ORDER BY id" + ).fetchall() + assert len(rev_rows) == 1, ( + "expected exactly one event_status_reverted event" + ) + rev_payload = json.loads(rev_rows[0][0]) + assert rev_payload["event_id"] == "evt_started" + assert rev_payload["prior_status"] == "planned" + + # Events projection: status is back to 'planned'. + status = conn.execute( + "SELECT status FROM events WHERE event_id = ?", + ("evt_started",), + ).fetchone()[0] + assert status == "planned" + + +def test_regenerate_rolls_back_event_completed_to_active(tmp_path, monkeypatch): + """T114.3: a completed event whose completion was triggered by the + superseded turn rolls back to 'active'. Mirrors the started→planned + case but exercises the 'completed → active' branch of + ``_PRIOR_STATUS_MAP`` in regenerate. + """ + import asyncio + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.services.regenerate import regenerate_assistant_turn + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + _ut_id, at_id = _seed_with_one_turn(db_path) + # The forward sequence here pretends the prior turn ALSO authored + # the start (which is realistic — a single turn flow could go + # planned → active → completed across multiple events). Tagging + # both with the same back-reference exercises the multi-rollback + # loop (one per affected lifecycle row). + _seed_event_with_lifecycle( + db_path, + event_id="evt_completed", + triggered_by_assistant_turn_id=at_id, + forward_kinds=["event_started", "event_completed"], + ) + + # Sanity: events row is 'completed'. + with open_db(db_path) as conn: + status = conn.execute( + "SELECT status FROM events WHERE event_id = ?", ("evt_completed",) + ).fetchone()[0] + assert status == "completed" + + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + no_transitions = json.dumps({"transitions": []}) + mock_client = MockLLMClient( + canned=["Refreshed reply.", state_canned, state_canned, no_transitions] + ) + settings = Settings(featherless_api_key="test") + + with open_db(db_path) as conn: + asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_bot_a", + original_assistant_event_id=at_id, + ) + ) + + with open_db(db_path) as conn: + # Two event_status_reverted rows land — one per forward + # transition that carried the back-reference. Both target the + # same event_id but with different prior_status values + # (in event_log id order: started→planned, completed→active). + rev_rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'event_status_reverted' ORDER BY id" + ).fetchall() + assert len(rev_rows) == 2 + rev_payloads = [json.loads(r[0]) for r in rev_rows] + assert rev_payloads[0] == { + "event_id": "evt_completed", + "prior_status": "planned", + } + assert rev_payloads[1] == { + "event_id": "evt_completed", + "prior_status": "active", + } + + # Events projection: the LAST applied event_status_reverted + # wins (active). That's the desired final state for a turn + # that was originally a started+completed double-step. + status = conn.execute( + "SELECT status FROM events WHERE event_id = ?", + ("evt_completed",), + ).fetchone()[0] + assert status == "active" + + +def test_regenerate_skips_events_without_back_reference( + tmp_path, monkeypatch, caplog +): + """T114.3 backward compatibility: lifecycle events authored before + T114.1 lack the ``triggered_by_assistant_turn_id`` payload field. + Regenerate must NOT emit ``event_status_reverted`` for such rows — + they're skipped (with a DEBUG log). The legacy T83.4 WARNING about + un-rolled-back transitions still fires for visibility. + """ + import asyncio + import logging + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.eventlog.log import append_and_apply + from chat.services.regenerate import regenerate_assistant_turn + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + _ut_id, at_id = _seed_with_one_turn(db_path) + + # Seed a lifecycle transition WITHOUT the back-reference field — + # mimicking pre-T114.1 event_log rows. + with open_db(db_path) as conn: + append_and_apply( + conn, + kind="event_planned", + payload={ + "event_id": "evt_legacy", + "chat_id": "chat_bot_a", + "kind": "story_event", + "props": {}, + "planned_for": "2026-04-30T18:00:00+00:00", + }, + ) + append_and_apply( + conn, + kind="event_started", + payload={ + "event_id": "evt_legacy", + "started_at": "2026-04-30T19:00:00+00:00", + # NOTE: no triggered_by_assistant_turn_id — pre-T114.1 + # legacy row. + }, + ) + + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + no_transitions = json.dumps({"transitions": []}) + mock_client = MockLLMClient( + canned=["Refreshed reply.", state_canned, state_canned, no_transitions] + ) + settings = Settings(featherless_api_key="test") + + caplog.set_level(logging.DEBUG, logger="chat.services.regenerate") + + with open_db(db_path) as conn: + asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_bot_a", + original_assistant_event_id=at_id, + ) + ) + + with open_db(db_path) as conn: + # No event_status_reverted was emitted for the legacy row. + rev_count = conn.execute( + "SELECT COUNT(*) FROM event_log " + "WHERE kind = 'event_status_reverted'" + ).fetchone()[0] + assert rev_count == 0 + + # Events row is still 'active' — the legacy transition stands. + status = conn.execute( + "SELECT status FROM events WHERE event_id = ?", + ("evt_legacy",), + ).fetchone()[0] + assert status == "active" + + # Debug log surfaces the skipped row. + debugs = [ + r.getMessage() + for r in caplog.records + if r.levelname == "DEBUG" + ] + assert any( + "skipping rollback for lifecycle event_log" in m for m in debugs + ), f"expected DEBUG about skipped legacy row; got: {debugs}" + + # Legacy WARNING still fires so operators see un-rolled-back rows. + warnings = [ + r.getMessage() + for r in caplog.records + if r.levelname == "WARNING" + and "lifecycle transition" in r.getMessage() + ] + assert warnings, ( + "expected WARNING about un-rolled-back legacy lifecycle " + f"transitions; got records: " + f"{[r.getMessage() for r in caplog.records]}" + ) + # The new wording references the missing back-reference field. + assert "triggered_by_assistant_turn_id" in warnings[0]