feat: regenerate rolls back lifecycle transitions on supersede (T114.3)

Closes the T83.4 gap: when ``regenerate_assistant_turn`` supersedes an
assistant_turn that already produced lifecycle transitions, it now
emits an ``event_status_reverted`` (T114.2) for each transition tagged
with ``triggered_by_assistant_turn_id == original_assistant_event_id``
(T114.1 back-reference) before the regenerated narrative is
reclassified.

Mapping from forward kind to ``prior_status`` lives in
``_PRIOR_STATUS_MAP``:
  - event_started   → planned
  - event_completed → active
  - event_cancelled → active (best-effort default; cancellation can fire
    from either planned or active, but detect_event_transitions only
    surfaces currently-active rows so 'active' is the realistic prior)

Backward compatibility: lifecycle rows authored before T114.1 lack the
back-reference field. Those are skipped (DEBUG log per row) and
collected into a legacy WARNING that preserves the T83.4
observability contract — operators still see un-rolled-back
transitions, just from older logs.

The classify-and-emit pass below the rollback now operates against an
events projection that has already been reverted, so re-firing
``event_started``/``event_completed``/``event_cancelled`` for the
regenerated narrative is safe — no double-emit of promotion artifacts.

Spec tests:
- ``test_regenerate_rolls_back_event_started_from_superseded_turn``
- ``test_regenerate_rolls_back_event_completed_to_active`` (also
  exercises the multi-rollback loop: a turn that fired both a start
  and a completion gets two event_status_reverted rows in id order,
  with active as the final projection — matching the per-row replay
  semantics of the projector)
- ``test_regenerate_skips_events_without_back_reference`` (pins the
  legacy compatibility path with both DEBUG and WARNING expectations)
This commit is contained in:
Joseph Doherty
2026-04-27 06:45:43 -04:00
parent 6d4ad86e33
commit 80ce891bd8
2 changed files with 459 additions and 36 deletions
+343
View File
@@ -1022,3 +1022,346 @@ def test_regenerate_registers_task_in_in_flight_tasks(tmp_path, monkeypatch):
assert isinstance(in_flight_snapshot.get("task"), asyncio.Task)
# Post-flight: the entry has been cleaned up.
assert "chat_bot_a" not in _in_flight_tasks
# ---------------------------------------------------------------------------
# T114: lifecycle rollback. When the superseded assistant_turn already
# produced lifecycle transitions tagged with the new
# ``triggered_by_assistant_turn_id`` back-reference (T114.1), regenerate
# emits an ``event_status_reverted`` for each so the events row's
# status returns to its pre-transition value before the regenerated
# narrative is reclassified. Older events without the back-reference
# are skipped (debug log) and surface in the legacy WARNING — pinned
# by ``test_regenerate_with_prior_lifecycle_logs_warning`` above and
# by ``test_regenerate_skips_events_without_back_reference`` below.
# ---------------------------------------------------------------------------
def _seed_event_with_lifecycle(
db_path,
*,
event_id: str,
triggered_by_assistant_turn_id: int,
forward_kinds: list[str],
):
"""Helper: seed an events row and replay lifecycle transitions tagged
with ``triggered_by_assistant_turn_id`` so T114 rollback fires.
``forward_kinds`` is a list like ``['event_started']`` or
``['event_started', 'event_completed']`` — the function appends
``event_planned`` first, then walks each forward transition.
"""
from chat.eventlog.log import append_and_apply
with open_db(db_path) as conn:
append_and_apply(
conn,
kind="event_planned",
payload={
"event_id": event_id,
"chat_id": "chat_bot_a",
"kind": "story_event",
"props": {},
"planned_for": "2026-04-30T18:00:00+00:00",
},
)
for kind in forward_kinds:
payload: dict = {
"event_id": event_id,
"triggered_by_assistant_turn_id": (
triggered_by_assistant_turn_id
),
}
if kind == "event_started":
payload["started_at"] = "2026-04-30T19:00:00+00:00"
else:
payload["completed_at"] = "2026-04-30T19:30:00+00:00"
append_and_apply(conn, kind=kind, payload=payload)
def test_regenerate_rolls_back_event_started_from_superseded_turn(
tmp_path, monkeypatch
):
"""T114.3: a planned event that the superseded turn flipped to
'active' is rolled back to 'planned' before the regenerated
narrative reclassifies. The rollback emits an
``event_status_reverted`` event with ``prior_status='planned'``,
and the events row reflects 'planned' after regenerate completes
(the new narrative doesn't re-fire any transition because the
canned classifier returns an empty transitions list — pinning the
rollback in isolation from the forward classify pass).
"""
import asyncio
from chat.config import Settings
from chat.db.migrate import apply_migrations
from chat.services.regenerate import regenerate_assistant_turn
db_path = tmp_path / "test.db"
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
monkeypatch.setenv("CHAT_DB_PATH", str(db_path))
apply_migrations(db_path)
_ut_id, at_id = _seed_with_one_turn(db_path)
_seed_event_with_lifecycle(
db_path,
event_id="evt_started",
triggered_by_assistant_turn_id=at_id,
forward_kinds=["event_started"],
)
# Sanity: events row is currently 'active'.
with open_db(db_path) as conn:
status = conn.execute(
"SELECT status FROM events WHERE event_id = ?", ("evt_started",)
).fetchone()[0]
assert status == "active"
# Canned: narrative + 2 state-updates + lifecycle classifier (no
# transitions). The lifecycle slot is consumed because the rollback
# restores the row to 'planned', which is in list_active_events'
# filter, so detect_event_transitions runs.
state_canned = json.dumps(
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
)
no_transitions = json.dumps({"transitions": []})
mock_client = MockLLMClient(
canned=["Refreshed reply.", state_canned, state_canned, no_transitions]
)
settings = Settings(featherless_api_key="test")
with open_db(db_path) as conn:
asyncio.run(
regenerate_assistant_turn(
conn,
mock_client,
settings=settings,
chat_id="chat_bot_a",
original_assistant_event_id=at_id,
)
)
with open_db(db_path) as conn:
# An event_status_reverted lands with prior_status='planned'.
rev_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'event_status_reverted' ORDER BY id"
).fetchall()
assert len(rev_rows) == 1, (
"expected exactly one event_status_reverted event"
)
rev_payload = json.loads(rev_rows[0][0])
assert rev_payload["event_id"] == "evt_started"
assert rev_payload["prior_status"] == "planned"
# Events projection: status is back to 'planned'.
status = conn.execute(
"SELECT status FROM events WHERE event_id = ?",
("evt_started",),
).fetchone()[0]
assert status == "planned"
def test_regenerate_rolls_back_event_completed_to_active(tmp_path, monkeypatch):
"""T114.3: a completed event whose completion was triggered by the
superseded turn rolls back to 'active'. Mirrors the started→planned
case but exercises the 'completed → active' branch of
``_PRIOR_STATUS_MAP`` in regenerate.
"""
import asyncio
from chat.config import Settings
from chat.db.migrate import apply_migrations
from chat.services.regenerate import regenerate_assistant_turn
db_path = tmp_path / "test.db"
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
monkeypatch.setenv("CHAT_DB_PATH", str(db_path))
apply_migrations(db_path)
_ut_id, at_id = _seed_with_one_turn(db_path)
# The forward sequence here pretends the prior turn ALSO authored
# the start (which is realistic — a single turn flow could go
# planned → active → completed across multiple events). Tagging
# both with the same back-reference exercises the multi-rollback
# loop (one per affected lifecycle row).
_seed_event_with_lifecycle(
db_path,
event_id="evt_completed",
triggered_by_assistant_turn_id=at_id,
forward_kinds=["event_started", "event_completed"],
)
# Sanity: events row is 'completed'.
with open_db(db_path) as conn:
status = conn.execute(
"SELECT status FROM events WHERE event_id = ?", ("evt_completed",)
).fetchone()[0]
assert status == "completed"
state_canned = json.dumps(
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
)
no_transitions = json.dumps({"transitions": []})
mock_client = MockLLMClient(
canned=["Refreshed reply.", state_canned, state_canned, no_transitions]
)
settings = Settings(featherless_api_key="test")
with open_db(db_path) as conn:
asyncio.run(
regenerate_assistant_turn(
conn,
mock_client,
settings=settings,
chat_id="chat_bot_a",
original_assistant_event_id=at_id,
)
)
with open_db(db_path) as conn:
# Two event_status_reverted rows land — one per forward
# transition that carried the back-reference. Both target the
# same event_id but with different prior_status values
# (in event_log id order: started→planned, completed→active).
rev_rows = conn.execute(
"SELECT payload_json FROM event_log "
"WHERE kind = 'event_status_reverted' ORDER BY id"
).fetchall()
assert len(rev_rows) == 2
rev_payloads = [json.loads(r[0]) for r in rev_rows]
assert rev_payloads[0] == {
"event_id": "evt_completed",
"prior_status": "planned",
}
assert rev_payloads[1] == {
"event_id": "evt_completed",
"prior_status": "active",
}
# Events projection: the LAST applied event_status_reverted
# wins (active). That's the desired final state for a turn
# that was originally a started+completed double-step.
status = conn.execute(
"SELECT status FROM events WHERE event_id = ?",
("evt_completed",),
).fetchone()[0]
assert status == "active"
def test_regenerate_skips_events_without_back_reference(
tmp_path, monkeypatch, caplog
):
"""T114.3 backward compatibility: lifecycle events authored before
T114.1 lack the ``triggered_by_assistant_turn_id`` payload field.
Regenerate must NOT emit ``event_status_reverted`` for such rows —
they're skipped (with a DEBUG log). The legacy T83.4 WARNING about
un-rolled-back transitions still fires for visibility.
"""
import asyncio
import logging
from chat.config import Settings
from chat.db.migrate import apply_migrations
from chat.eventlog.log import append_and_apply
from chat.services.regenerate import regenerate_assistant_turn
db_path = tmp_path / "test.db"
cfg = tmp_path / "config.toml"
cfg.write_text('featherless_api_key = "test"\n')
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
monkeypatch.setenv("CHAT_DB_PATH", str(db_path))
apply_migrations(db_path)
_ut_id, at_id = _seed_with_one_turn(db_path)
# Seed a lifecycle transition WITHOUT the back-reference field —
# mimicking pre-T114.1 event_log rows.
with open_db(db_path) as conn:
append_and_apply(
conn,
kind="event_planned",
payload={
"event_id": "evt_legacy",
"chat_id": "chat_bot_a",
"kind": "story_event",
"props": {},
"planned_for": "2026-04-30T18:00:00+00:00",
},
)
append_and_apply(
conn,
kind="event_started",
payload={
"event_id": "evt_legacy",
"started_at": "2026-04-30T19:00:00+00:00",
# NOTE: no triggered_by_assistant_turn_id — pre-T114.1
# legacy row.
},
)
state_canned = json.dumps(
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
)
no_transitions = json.dumps({"transitions": []})
mock_client = MockLLMClient(
canned=["Refreshed reply.", state_canned, state_canned, no_transitions]
)
settings = Settings(featherless_api_key="test")
caplog.set_level(logging.DEBUG, logger="chat.services.regenerate")
with open_db(db_path) as conn:
asyncio.run(
regenerate_assistant_turn(
conn,
mock_client,
settings=settings,
chat_id="chat_bot_a",
original_assistant_event_id=at_id,
)
)
with open_db(db_path) as conn:
# No event_status_reverted was emitted for the legacy row.
rev_count = conn.execute(
"SELECT COUNT(*) FROM event_log "
"WHERE kind = 'event_status_reverted'"
).fetchone()[0]
assert rev_count == 0
# Events row is still 'active' — the legacy transition stands.
status = conn.execute(
"SELECT status FROM events WHERE event_id = ?",
("evt_legacy",),
).fetchone()[0]
assert status == "active"
# Debug log surfaces the skipped row.
debugs = [
r.getMessage()
for r in caplog.records
if r.levelname == "DEBUG"
]
assert any(
"skipping rollback for lifecycle event_log" in m for m in debugs
), f"expected DEBUG about skipped legacy row; got: {debugs}"
# Legacy WARNING still fires so operators see un-rolled-back rows.
warnings = [
r.getMessage()
for r in caplog.records
if r.levelname == "WARNING"
and "lifecycle transition" in r.getMessage()
]
assert warnings, (
"expected WARNING about un-rolled-back legacy lifecycle "
f"transitions; got records: "
f"{[r.getMessage() for r in caplog.records]}"
)
# The new wording references the missing back-reference field.
assert "triggered_by_assistant_turn_id" in warnings[0]