diff --git a/chat/services/regenerate.py b/chat/services/regenerate.py index 6427092..1317903 100644 --- a/chat/services/regenerate.py +++ b/chat/services/regenerate.py @@ -73,12 +73,15 @@ from sqlite3 import Connection from chat.config import Settings from chat.eventlog.log import append_and_apply, append_event +from chat.services.event_lifecycle import detect_event_transitions +from chat.services.event_promotion import promote_completed_event from chat.services.interjection import detect_interjection from chat.services.memory_write import record_turn_memory_for_present from chat.services.multi_state_update import compute_state_updates_for_present from chat.services.prompt import assemble_narrative_prompt from chat.state.edges import get_edge from chat.state.entities import get_bot, get_you +from chat.state.events import list_active_events from chat.state.world import active_scene, get_chat from chat.web.pubsub import publish from chat.web.render import render_turn_html @@ -617,6 +620,67 @@ async def regenerate_assistant_turn( (new_assistant_event_id, original_interjection_event_id), ) + # 10. Event-lifecycle detection (Phase 3, T61). Mirrors the post_turn + # block: classify whether any active events transitioned in the + # regenerated narrative and append the corresponding event_started / + # event_completed / event_cancelled. ``promote_completed_event`` + # runs inline after a completion so promotion artifacts land in the + # same regenerate path. + # + # Phase 3.5 follow-up: when a regenerate replaces a turn that had + # already produced event transitions, those original transitions are + # NOT undone here. The superseded ``assistant_turn`` group keeps its + # prior ``event_started`` / ``event_completed`` events in the log + # (they remain projected onto the events table). Phase 3.5 will add + # an "undo lifecycle" step to roll back the prior transitions before + # re-classifying the regenerated text. For v3 we accept that a + # regenerate-after-completion will double-emit promotion artifacts + # if the new text re-completes the same event — narratively rare, + # and a true fix needs the lifecycle-undo pass. + new_active_events = list_active_events(conn, chat_id) + if new_active_events: + lifecycle_decision = await detect_event_transitions( + client, + classifier_model=settings.classifier_model, + narrative_text=new_text, + active_events=new_active_events, + timeout_s=settings.classifier_timeout_s, + ) + for transition in lifecycle_decision.transitions: + if transition.new_status == "active": + append_and_apply( + conn, + kind="event_started", + payload={ + "event_id": transition.event_id, + "started_at": chat.get("time"), + }, + ) + elif transition.new_status == "completed": + append_and_apply( + conn, + kind="event_completed", + payload={ + "event_id": transition.event_id, + "completed_at": chat.get("time"), + }, + ) + promote_completed_event( + conn, + event_id=transition.event_id, + chat_id=chat_id, + chat_clock_at=chat.get("time"), + ) + elif transition.new_status == "cancelled": + append_and_apply( + conn, + kind="event_cancelled", + payload={ + "event_id": transition.event_id, + "completed_at": chat.get("time"), + }, + ) + return new_text diff --git a/chat/web/turns.py b/chat/web/turns.py index b3a4f0e..5ef6725 100644 --- a/chat/web/turns.py +++ b/chat/web/turns.py @@ -57,6 +57,8 @@ from fastapi.responses import HTMLResponse, RedirectResponse, Response from chat.eventlog.log import append_and_apply, append_event from chat.services.addressee import detect_addressee from chat.services.background import SignificanceJob +from chat.services.event_lifecycle import detect_event_transitions +from chat.services.event_promotion import promote_completed_event from chat.services.interjection import detect_interjection from chat.services.memory_write import record_turn_memory_for_present from chat.services.multi_state_update import compute_state_updates_for_present @@ -67,6 +69,7 @@ from chat.services.scene_summarize import apply_scene_close_summary from chat.services.turn_parse import ParsedTurn, parse_turn from chat.state.edges import get_edge from chat.state.entities import get_bot, get_you +from chat.state.events import list_active_events from chat.state.world import active_scene, get_chat, get_container from chat.web.bots import get_conn from chat.web.kickoff import get_llm_client @@ -654,6 +657,76 @@ async def post_turn( ) ) + # 8a. Event-lifecycle detection (Phase 3, T61). Runs after the post-turn + # classifier passes (memory write + state update + optional + # interjection) and BEFORE scene-close detection. The classifier reads + # ``primary_text`` against the chat's currently-active events and + # returns a (usually empty) list of transitions. Each transition lands + # an ``event_started`` / ``event_completed`` / ``event_cancelled`` + # event via ``append_and_apply`` so the events projection updates + # synchronously. A completion is followed inline by + # ``promote_completed_event`` so any structured artifacts the event + # carries (knowledge_facts, relationship_change, acquired_objects) + # land in state in the same turn — see chat/services/event_promotion. + # + # ``detect_event_transitions`` short-circuits when ``active_events`` + # is empty (per T52), so chats without active events don't pay a + # classifier round-trip and existing fixtures need no extra canned + # slots. + active_events = list_active_events(conn, chat_id) + if active_events: + lifecycle_decision = await detect_event_transitions( + client, + classifier_model=settings.classifier_model, + narrative_text=primary_text, + active_events=active_events, + timeout_s=settings.classifier_timeout_s, + ) + for transition in lifecycle_decision.transitions: + if transition.new_status == "active": + append_and_apply( + conn, + kind="event_started", + payload={ + "event_id": transition.event_id, + "started_at": chat.get("time"), + }, + ) + elif transition.new_status == "completed": + append_and_apply( + conn, + kind="event_completed", + payload={ + "event_id": transition.event_id, + "completed_at": chat.get("time"), + }, + ) + # Run promotion inline so the artifact-emitting events + # (edge_update / manual_edit) land synchronously after + # the completion. ``promote_completed_event`` is + # synchronous (no await) and skips silently when the + # event row's status isn't 'completed' — a safety net + # for races, not expected to trigger in practice. + promote_completed_event( + conn, + event_id=transition.event_id, + chat_id=chat_id, + chat_clock_at=chat.get("time"), + ) + elif transition.new_status == "cancelled": + append_and_apply( + conn, + kind="event_cancelled", + payload={ + "event_id": transition.event_id, + "completed_at": chat.get("time"), + }, + ) + # Any other ``new_status`` value falls through silently — + # the lifecycle service constrains the schema to the three + # valid transitions, and a defensive no-op here keeps the + # turn flow tolerant of unexpected outputs. + # 9. Scene-close detection (Plan §7.2, T26). Runs AFTER assistant_turn # and the optional interjection so the bots' responses are part of # the closing scene's final beat — closing before narrative would diff --git a/tests/test_turn_flow.py b/tests/test_turn_flow.py index a30ec24..a202e2d 100644 --- a/tests/test_turn_flow.py +++ b/tests/test_turn_flow.py @@ -19,7 +19,7 @@ from fastapi.testclient import TestClient from chat.app import app from chat.db.connection import open_db -from chat.eventlog.log import append_event +from chat.eventlog.log import append_and_apply, append_event from chat.eventlog.projector import project from chat.llm.mock import MockLLMClient @@ -896,3 +896,244 @@ def test_interjection_enqueues_significance_job(app_state_setup, tmp_path): # The two narrative texts should be the two streamed beats. narrative_texts = sorted(job.narrative_text for job in captured_jobs) assert narrative_texts == ["Interjection beat!", "Primary beat."] + + +# --------------------------------------------------------------------------- +# Phase 3 (T61) — per-turn event-lifecycle detection + completion promotion. +# +# After the post-turn classifier passes (memory write, state update, +# interjection check) and BEFORE scene-close detection, ``post_turn`` +# calls :func:`detect_event_transitions`. Each transition becomes one +# of ``event_started`` / ``event_completed`` / ``event_cancelled``. A +# completed event is followed inline by ``promote_completed_event`` so +# the props it carries (knowledge_facts, etc.) land in state +# synchronously. +# +# When no active events are seeded the classifier short-circuits without +# an LLM call (per T52) — the canned queue therefore needs ZERO extra +# slots in that case. +# --------------------------------------------------------------------------- + + +def test_turn_with_event_transition_appends_started_event( + app_state_setup, tmp_path +): + """A planned event becomes active when the classifier reports a + ``new_status='active'`` transition for that event_id. + + Canned queue (5 calls — single-bot, no scene seeded): + 1. parse_turn + 2. narrative stream + 3. state-update bot_a -> you + 4. state-update you -> bot_a + 5. detect_event_transitions -> 1 transition (active) + """ + _seed(tmp_path / "test.db") + # Seed a planned event so list_active_events returns 1 row. Use + # append_and_apply so we don't re-replay the prior chat_created event + # (whose handler is INSERT-not-IGNORE and would 409 on replay). + with open_db(tmp_path / "test.db") as conn: + append_and_apply( + conn, + kind="event_planned", + payload={ + "event_id": "evt_1", + "chat_id": "chat_bot_a", + "kind": "story_event", + "props": {}, + "planned_for": "2026-04-30T18:00:00+00:00", + }, + ) + + canned_parse = json.dumps( + {"segments": [{"kind": "dialogue", "text": "they arrived"}]} + ) + canned_event_decision = json.dumps( + { + "transitions": [ + { + "event_id": "evt_1", + "new_status": "active", + "reason": "they arrived", + } + ] + } + ) + mock = _override_llm( + [ + canned_parse, + "They walk in.", + _zero_state(), + _zero_state(), + canned_event_decision, + ] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", data={"prose": "they arrived"} + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + # All 5 canned slots consumed. + assert mock._canned == [] + + with open_db(tmp_path / "test.db") as conn: + # event_started landed in event_log. + rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'event_started' ORDER BY id" + ).fetchall() + assert len(rows) == 1 + started_payload = json.loads(rows[0][0]) + assert started_payload["event_id"] == "evt_1" + assert started_payload["started_at"] == "2026-04-26T20:00:00+00:00" + + # The events projection row reflects the active status. + ev_row = conn.execute( + "SELECT status, started_at FROM events WHERE event_id = ?", + ("evt_1",), + ).fetchone() + assert ev_row is not None + assert ev_row[0] == "active" + assert ev_row[1] == "2026-04-26T20:00:00+00:00" + + +def test_turn_with_event_completion_runs_promotion(app_state_setup, tmp_path): + """An active event with knowledge_facts in props completes; the + inline call to ``promote_completed_event`` emits the corresponding + ``edge_update``. + """ + _seed(tmp_path / "test.db") + # Seed: planned -> started so the event is currently active. Props + # carry a knowledge_fact that promotion will turn into an edge_update. + # Use append_and_apply (not project) to avoid re-replaying chat_created. + with open_db(tmp_path / "test.db") as conn: + append_and_apply( + conn, + kind="event_planned", + payload={ + "event_id": "evt_2", + "chat_id": "chat_bot_a", + "kind": "story_event", + "props": { + "knowledge_facts": [ + { + "owner_id": "bot_a", + "target_id": "you", + "fact": "Maya likes pottery", + } + ] + }, + "planned_for": "2026-04-30T18:00:00+00:00", + }, + ) + append_and_apply( + conn, + kind="event_started", + payload={ + "event_id": "evt_2", + "started_at": "2026-04-30T19:00:00+00:00", + }, + ) + + # Snapshot the max event_log id so we can assert on rows AFTER the turn. + with open_db(tmp_path / "test.db") as conn: + before_id = conn.execute( + "SELECT COALESCE(MAX(id), 0) FROM event_log" + ).fetchone()[0] + + canned_parse = json.dumps( + {"segments": [{"kind": "dialogue", "text": "we wrap it up"}]} + ) + canned_event_decision = json.dumps( + { + "transitions": [ + { + "event_id": "evt_2", + "new_status": "completed", + "reason": "wrapped", + } + ] + } + ) + mock = _override_llm( + [ + canned_parse, + "They wrap it up.", + _zero_state(), + _zero_state(), + canned_event_decision, + ] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", data={"prose": "we wrap it up"} + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + assert mock._canned == [] + + with open_db(tmp_path / "test.db") as conn: + # event_completed landed. + completed_rows = conn.execute( + "SELECT id, payload_json FROM event_log " + "WHERE kind = 'event_completed' AND id > ? ORDER BY id", + (before_id,), + ).fetchall() + assert len(completed_rows) == 1 + completed_payload = json.loads(completed_rows[0][1]) + assert completed_payload["event_id"] == "evt_2" + completed_id = completed_rows[0][0] + + # promote_completed_event ran inline AFTER event_completed: the + # follow-on edge_update carries the knowledge fact and is tagged + # with source=event_promotion. + promo_rows = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'edge_update' AND id > ? ORDER BY id", + (completed_id,), + ).fetchall() + promo_facts: list[str] = [] + for (payload_json,) in promo_rows: + p = json.loads(payload_json) + if p.get("source") == "event_promotion": + promo_facts.extend(p.get("knowledge_facts") or []) + + assert "Maya likes pottery" in promo_facts + + +def test_turn_with_no_active_events_skips_classifier(app_state_setup, tmp_path): + """When no active events are seeded, ``detect_event_transitions`` + short-circuits without an LLM call (per T52). The canned queue must + therefore have ZERO event-detection slots — same shape as the + Phase 2 no-guest baseline. + """ + _seed(tmp_path / "test.db") + + canned_parse = json.dumps( + {"segments": [{"kind": "dialogue", "text": "hello"}]} + ) + # Only 4 slots: parse + narrative + 2 state-updates. NO extra slot for + # event-detection — non-existent active_events causes the helper to + # short-circuit before pulling from the queue. + mock = _override_llm( + [canned_parse, "Hi there.", _zero_state(), _zero_state()] + ) + try: + response = app_state_setup.post( + "/chats/chat_bot_a/turns", data={"prose": "hello"} + ) + assert response.status_code == 204 + finally: + app.dependency_overrides.clear() + # Queue fully drained — no canned slot was consumed by event detection. + assert mock._canned == [] + + with open_db(tmp_path / "test.db") as conn: + for kind in ("event_started", "event_completed", "event_cancelled"): + count = conn.execute( + "SELECT COUNT(*) FROM event_log WHERE kind = ?", (kind,) + ).fetchone()[0] + assert count == 0, f"expected zero {kind} events, got {count}"