diff --git a/chat/services/regenerate.py b/chat/services/regenerate.py index eff02a2..6427092 100644 --- a/chat/services/regenerate.py +++ b/chat/services/regenerate.py @@ -26,6 +26,7 @@ Phase 1 simplifications (per the plan's "bound it" guidance): so affinity/trust/knowledge reflect the new output. - The route does not broadcast a fresh ``turn_html`` SSE event; T34 polishes UI swaps. The user refreshes the page to see the new turn. + *(T73.1 closed this gap — see Phase 2.5 changes below.)* Phase 2 changes (T44): @@ -42,6 +43,27 @@ Phase 2 changes (T44): is not invoked here. If the prior turn fired an interjection it remains attached to the original assistant_turn (which is superseded alongside the regenerated turn) — Phase 2.5 will revisit. + +Phase 2.5 changes: + +- T73.1: After the new ``assistant_turn`` lands we publish a + ``turn_html_replace`` SSE event carrying the rendered HTML for the + regenerated turn plus the original assistant_turn's event_id as + ``supersedes_id`` so connected tabs can swap the prior DOM node + in-place. We use a NEW event name (rather than re-using ``turn_html``) + because the existing HTMX ``sse-swap="turn_html"`` consumer expects a + raw-HTML body and an *append* semantic; ``turn_html_replace`` is a + JSON payload (sse.py auto-serialises when extra keys accompany + ``data``) so the front-end JS can read ``supersedes_id`` and replace + the right node. +- T73.2: Interjection regeneration. When the original assistant_turn + group included an interjection beat we redo BOTH the primary and the + interjection — re-running ``detect_interjection`` against the new + primary text. If the classifier returns False this time we supersede + the original interjection without appending a replacement. +- T73.3: The defensive degrade-to-1:1 for stale ``guest_bot_id`` + references was removed — Phase 2 T47 fixed the root cause (resets + clear the reference) so the guard is dead code. """ from __future__ import annotations @@ -51,6 +73,7 @@ from sqlite3 import Connection from chat.config import Settings from chat.eventlog.log import append_and_apply, append_event +from chat.services.interjection import detect_interjection from chat.services.memory_write import record_turn_memory_for_present from chat.services.multi_state_update import compute_state_updates_for_present from chat.services.prompt import assemble_narrative_prompt @@ -58,6 +81,7 @@ from chat.state.edges import get_edge from chat.state.entities import get_bot, get_you from chat.state.world import active_scene, get_chat from chat.web.pubsub import publish +from chat.web.render import render_turn_html async def regenerate_assistant_turn( @@ -90,13 +114,13 @@ async def regenerate_assistant_turn( # Phase 2: surface the guest (if any) so the prompt assembler and # downstream multi-entity passes see the same shape post_turn does. + # Phase 2 T47 made bot_reset cascade-clear ``chat.guest_bot_id`` when + # the referenced bot is purged (verified by tests/test_reset.py), so + # we trust the column here: it's either a valid bot id or NULL. guest_bot_id = chat.get("guest_bot_id") - guest_bot: dict | None = None - if guest_bot_id is not None: - guest_bot = get_bot(conn, guest_bot_id) - if guest_bot is None: - # Stale guest reference — degrade to single-bot regenerate. - guest_bot_id = None + guest_bot: dict | None = ( + get_bot(conn, guest_bot_id) if guest_bot_id is not None else None + ) # 1. Locate the original assistant_turn event. row = conn.execute( @@ -108,6 +132,33 @@ async def regenerate_assistant_turn( raise ValueError("assistant_turn event not found") original_assistant_payload = json.loads(row[0]) original_user_turn_id = original_assistant_payload.get("user_turn_id") + + # 1a. Look up any sibling interjection beat in the same turn group + # (T73.2). The original group is (primary + optional interjection), + # both pinned to the same ``user_turn_id``. The interjection has a + # populated ``interjection_of`` field in its payload — its speaker is + # the silent witness (the bot that wasn't the primary addressee). + # Filter on ``superseded_by IS NULL`` so prior regenerates of this + # group don't reappear as siblings. + original_interjection_event_id: int | None = None + original_interjection_payload: dict | None = None + if original_user_turn_id is not None: + sibling_cur = conn.execute( + "SELECT id, payload_json FROM event_log " + "WHERE kind = 'assistant_turn' " + " AND id != ? " + " AND superseded_by IS NULL", + (original_assistant_event_id,), + ) + for sib_id, sib_payload_json in sibling_cur.fetchall(): + sib_payload = json.loads(sib_payload_json) + if sib_payload.get("user_turn_id") != original_user_turn_id: + continue + if not sib_payload.get("interjection_of"): + continue + original_interjection_event_id = sib_id + original_interjection_payload = sib_payload + break # Phase 2 v2 regenerates only the addressee turn — preserve whichever # bot the original turn was attributed to, falling back to the host # for legacy rows that pre-date multi-entity support. @@ -238,6 +289,27 @@ async def regenerate_assistant_turn( (new_assistant_event_id, original_assistant_event_id), ) + # 7a. Broadcast a turn_html_replace SSE event so connected tabs can + # swap the prior assistant_turn DOM node in-place (T73.1, Phase 1.5 + # backlog #2). Uses a separate event name from post_turn's + # ``turn_html`` (which is append-only) because regenerate is a + # *replace* operation — see module docstring for the rationale. + speaker_name_for_render = ( + speaker_bot.get("name", "bot") if speaker_bot is not None else "bot" + ) + new_turn_html = render_turn_html( + speaker_name_for_render, new_text, role="bot" + ) + await publish( + chat_id, + { + "event": "turn_html_replace", + "data": new_turn_html, + "turn_id": new_assistant_event_id, + "supersedes_id": original_assistant_event_id, + }, + ) + # 8. Re-run downstream classifier passes (memory write + state update # for every directed pair across present entities). Significance is # intentionally skipped on regenerate (the prior score remains @@ -317,6 +389,234 @@ async def regenerate_assistant_turn( }, ) + # 9. Interjection regenerate branch (T73.2). When the original + # assistant_turn group included a follow-on interjection beat we need + # to revisit that beat against the regenerated primary. Three outcomes: + # + # - No original interjection: nothing to do; we already short-circuit + # above by leaving ``original_interjection_event_id`` as None. + # - Original interjection + classifier returns True: stream a fresh + # interjection from the silent witness, append it (with + # ``interjection_of`` linking to the new primary speaker), and + # supersede the original interjection's row. Also re-run memory + # + state-update so the second beat moves edges + writes memories. + # - Original interjection + classifier returns False: supersede the + # original interjection without appending a replacement. The + # regenerated group becomes "primary only" because the new primary + # no longer warrants a follow-on. No memory / state work needed + # for the absent beat. + # + # ``superseded_by`` on the original interjection's row points at the + # *new primary* in the no-replacement case (rather than NULL or a + # nonexistent id) so the row is consistently hidden by the standard + # ``superseded_by IS NULL`` timeline filter and the back-pointer + # leads somewhere meaningful for an "originally said …" affordance. + if original_interjection_event_id is not None and guest_bot is not None: + # Identify the silent witness from the original interjection's + # speaker_id (which is the bot that interjected last time). When + # we regenerate we keep the *same pair of present entities*, so + # the silent witness is whichever bot isn't the new primary + # speaker — derive it from present rather than reusing the prior + # speaker_id verbatim, in case the regenerated primary swapped + # who held the floor. + if speaker_bot_id == host_bot_id: + silent_witness = guest_bot + else: + silent_witness = host_bot + silent_witness_id = silent_witness.get("id") + + edge_w_to_addr = get_edge(conn, silent_witness_id, speaker_bot_id) or { + "affinity": 50, + "trust": 50, + "summary": "", + } + edge_w_to_you = get_edge(conn, silent_witness_id, "you") or { + "affinity": 50, + "trust": 50, + "summary": "", + } + + decision = await detect_interjection( + client, + classifier_model=settings.classifier_model, + addressee_name=speaker_bot.get("name", "bot"), + addressee_just_said=new_text, + silent_witness_name=silent_witness.get("name", "bot"), + silent_witness_persona=silent_witness.get("persona") or "", + silent_witness_edge_to_addressee=edge_w_to_addr, + silent_witness_edge_to_you=edge_w_to_you, + you_just_said=prose_for_prompt or "", + timeout_s=settings.classifier_timeout_s, + ) + + if decision.should_interject: + # Re-read recent so the just-appended primary is in the prompt. + interject_cur = conn.execute( + "SELECT id, kind, payload_json FROM event_log " + "WHERE kind IN ('user_turn', 'user_turn_edit', 'assistant_turn') " + " AND superseded_by IS NULL AND hidden = 0 " + "ORDER BY id DESC LIMIT 20", + ) + interject_rows = list(reversed(interject_cur.fetchall())) + interject_recent: list[dict] = [] + for _eid, kind, payload_json in interject_rows: + p = json.loads(payload_json) + if p.get("chat_id") != chat_id: + continue + if kind in ("user_turn", "user_turn_edit"): + interject_recent.append( + {"speaker": you_name, "text": p.get("prose", "")} + ) + else: + spk = p.get("speaker_id", "bot") + if spk == host_bot_id: + spk_name = host_bot.get("name", "bot") + elif spk == guest_bot.get("id"): + spk_name = guest_bot.get("name", "bot") + else: + spk_name = "bot" + interject_recent.append( + {"speaker": spk_name, "text": p.get("text", "")} + ) + if interject_recent and interject_recent[-1].get("speaker") == you_name: + interject_recent = interject_recent[:-1] + + interject_messages = assemble_narrative_prompt( + conn, + chat_id=chat_id, + speaker_bot_id=silent_witness_id, + addressee=speaker_bot_id, + user_turn_prose=prose_for_prompt or None, + recent_dialogue=interject_recent, + budget_soft=settings.narrative_budget_soft, + budget_hard=settings.narrative_budget_hard, + guest_id=guest_bot_id, + ) + + interject_accumulated: list[str] = [] + async for chunk in client.stream( + interject_messages, + model=settings.narrative_model, + max_tokens=settings.narrative_max_tokens, + temperature=settings.narrative_temperature, + ): + interject_accumulated.append(chunk) + await publish( + chat_id, + { + "event": "token", + "text": chunk, + "speaker_id": silent_witness_id, + }, + ) + interject_text = "".join(interject_accumulated) + + new_interjection_event_id = append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": chat_id, + "speaker_id": silent_witness_id, + "text": interject_text, + "truncated": False, + "user_turn_id": ( + new_user_event_id + if new_user_event_id is not None + else original_user_turn_id + ), + "regenerated_from": original_interjection_event_id, + "interjection_of": speaker_bot_id, + }, + ) + + # Supersede the original interjection by the new one. + conn.execute( + "UPDATE event_log SET superseded_by = ? WHERE id = ?", + (new_interjection_event_id, original_interjection_event_id), + ) + + # Broadcast a replace event so connected tabs swap the prior + # interjection node in-place (mirrors T73.1's primary swap). + interject_html = render_turn_html( + silent_witness.get("name", "bot"), interject_text, role="bot" + ) + await publish( + chat_id, + { + "event": "turn_html_replace", + "data": interject_html, + "turn_id": new_interjection_event_id, + "supersedes_id": original_interjection_event_id, + }, + ) + + # Memory write for the new interjection beat (one event per + # present witness). + record_turn_memory_for_present( + conn, + chat_id=chat_id, + host_bot_id=host_bot_id, + guest_bot_id=guest_bot_id, + narrative_text=interject_text, + scene_id=scene["id"] if scene else None, + chat_clock_at=chat.get("time"), + ) + + # Re-run the multi-pair state-update with the post-interjection + # dialogue tail so deltas land on the post-primary baseline. + recent_post_interject = recent_for_update + [ + { + "speaker": silent_witness.get("name", "bot"), + "text": interject_text, + } + ] + prior_edges_post: dict[tuple[str, str], dict] = {} + for src in present_ids: + for tgt in present_ids: + if src == tgt: + continue + edge = get_edge(conn, src, tgt) or { + "affinity": 50, + "trust": 50, + "summary": "", + } + prior_edges_post[(src, tgt)] = edge + + state_updates_post = await compute_state_updates_for_present( + client, + classifier_model=settings.classifier_model, + present_ids=present_ids, + present_names=present_names, + personas=personas, + prior_edges=prior_edges_post, + recent_dialogue=recent_post_interject, + timeout_s=settings.classifier_timeout_s, + ) + for src_id, tgt_id, update in state_updates_post: + append_and_apply( + conn, + kind="edge_update", + payload={ + "source_id": src_id, + "target_id": tgt_id, + "chat_id": chat_id, + "affinity_delta": update.affinity_delta, + "trust_delta": update.trust_delta, + "knowledge_facts": update.knowledge_facts, + "last_interaction_at": last_at, + "last_interaction_chat_id": chat_id, + }, + ) + else: + # Classifier said "no follow-on this time" — supersede the + # original interjection without a replacement. Point the + # back-pointer at the new primary so the row is consistently + # hidden by the standard timeline filter. + conn.execute( + "UPDATE event_log SET superseded_by = ? WHERE id = ?", + (new_assistant_event_id, original_interjection_event_id), + ) + return new_text diff --git a/tests/test_regenerate.py b/tests/test_regenerate.py index b561d5f..7fa22bc 100644 --- a/tests/test_regenerate.py +++ b/tests/test_regenerate.py @@ -271,3 +271,394 @@ def test_regenerate_404_when_assistant_turn_missing(client, tmp_path): assert response.status_code == 404 finally: app.dependency_overrides.clear() + + +def _seed_with_interjection_group(db_path): + """Seed a multi-entity scene with a (primary + interjection) group. + + Returns ``(user_turn_id, primary_at_id, interjection_at_id)``. + + The primary speaker is the host (bot_a); the silent witness who + interjected is the guest (bot_b). Mirrors the convention in + chat/web/turns.py — both assistant_turns share the same + ``user_turn_id`` and the interjection's payload carries + ``interjection_of=``. + """ + with open_db(db_path) as conn: + for bot_id, name, persona in ( + ("bot_a", "BotA", "thoughtful"), + ("bot_b", "BotB", "loud"), + ): + append_event( + conn, + kind="bot_authored", + payload={ + "id": bot_id, + "name": name, + "persona": persona, + "voice_samples": [], + "traits": [], + "backstory": "", + "initial_relationship_to_you": "", + "kickoff_prose": "", + }, + ) + append_event( + conn, + kind="chat_created", + payload={ + "id": "chat_multi", + "host_bot_id": "bot_a", + "guest_bot_id": "bot_b", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1", + "weather": "", + }, + ) + for src, tgt in ( + ("bot_a", "you"), + ("you", "bot_a"), + ("bot_b", "you"), + ("you", "bot_b"), + ("bot_a", "bot_b"), + ("bot_b", "bot_a"), + ): + append_event( + conn, + kind="edge_update", + payload={ + "source_id": src, + "target_id": tgt, + "chat_id": "chat_multi", + }, + ) + for entity_id in ("you", "bot_a", "bot_b"): + append_event( + conn, + kind="activity_change", + payload={ + "entity_id": entity_id, + "posture": "sitting", + "action": {"verb": "talking"}, + "attention": "", + "holding": [], + "status": {}, + }, + ) + ut_id = append_event( + conn, + kind="user_turn", + payload={ + "chat_id": "chat_multi", + "prose": "hello", + "segments": [], + }, + ) + primary_id = append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_multi", + "speaker_id": "bot_a", + "text": "Original primary.", + "truncated": False, + "user_turn_id": ut_id, + }, + ) + interjection_id = append_event( + conn, + kind="assistant_turn", + payload={ + "chat_id": "chat_multi", + "speaker_id": "bot_b", + "text": "Original interjection!", + "truncated": False, + "user_turn_id": ut_id, + "interjection_of": "bot_a", + }, + ) + project(conn) + return ut_id, primary_id, interjection_id + + +def test_regenerate_broadcasts_turn_html_over_sse( + tmp_path, monkeypatch +): + """T73.1: regenerate publishes a ``turn_html_replace`` SSE event so + connected tabs swap the prior turn's DOM node in place. + + The event carries: + - ``data``: rendered HTML for the new turn + - ``turn_id``: event_id of the new assistant_turn + - ``supersedes_id``: event_id of the original assistant_turn + """ + import asyncio + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.services import regenerate as regenerate_module + from chat.services.regenerate import regenerate_assistant_turn + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + ut_id, at_id = _seed_with_one_turn(db_path) + + published: list[tuple[str, dict]] = [] + + async def _capture(chat_id, event): + published.append((chat_id, event)) + + # Patch the imported reference inside the regenerate module so the + # service's call site goes through our spy. + monkeypatch.setattr(regenerate_module, "publish", _capture) + + narrative_canned = "Refreshed reply." + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + canned = [narrative_canned, state_canned, state_canned] + mock_client = MockLLMClient(canned=list(canned)) + + settings = Settings(featherless_api_key="test") + + with open_db(db_path) as conn: + new_text = asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_bot_a", + original_assistant_event_id=at_id, + ) + ) + assert new_text == narrative_canned + + # Find the new assistant_turn event_id for cross-checking. + cur = conn.execute( + "SELECT id FROM event_log " + "WHERE kind = 'assistant_turn' AND id != ? " + "AND superseded_by IS NULL", + (at_id,), + ).fetchone() + new_at_id = cur[0] + + # Filter out per-token publishes; we want the replace broadcast. + replace_calls = [ + ev for (_cid, ev) in published if ev.get("event") == "turn_html_replace" + ] + assert len(replace_calls) == 1 + payload = replace_calls[0] + assert payload["supersedes_id"] == at_id + assert payload["turn_id"] == new_at_id + # The HTML carries the new narrative text and the speaker name. + assert "Refreshed reply." in payload["data"] + assert "BotA" in payload["data"] + # Sanity: every publish targeted this chat. + for cid, _ev in published: + assert cid == "chat_bot_a" + + +def test_regenerate_with_interjection_redoes_both_turns(tmp_path, monkeypatch): + """T73.2: when the original turn group included an interjection, both + the primary and the interjection are regenerated. + + Setup: 3-entity scene (host BotA + guest BotB + you) with a prior + (primary by BotA + interjection by BotB) group. Mock the + interjection classifier to return ``should_interject=True`` so the + follow-on regenerates too. + + Assert: 2 new assistant_turns exist for the same user_turn_id, the + second carrying ``interjection_of`` pointing at the new primary's + speaker_id. Both originals are superseded. + """ + import asyncio + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.services import regenerate as regenerate_module + from chat.services.interjection import InterjectionDecision + from chat.services.regenerate import regenerate_assistant_turn + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + ut_id, primary_id, interjection_id = _seed_with_interjection_group(db_path) + + # Stub detect_interjection so the classifier "fires" with new prose. + async def _stub_should_interject(*_args, **_kwargs): + return InterjectionDecision(should_interject=True, reason="fired") + + monkeypatch.setattr( + regenerate_module, "detect_interjection", _stub_should_interject + ) + + # Canned queue: + # 1. New primary narrative stream. + # 2-7. Six state-update classifier calls (one per directed pair + # across host/you/guest = 6 pairs) for the primary pass. + # 8. New interjection narrative stream. + # 9-14. Six state-update classifier calls for the post-interjection + # pass. + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + canned: list[str] = [] + canned.append("New primary text.") + canned.extend([state_canned] * 6) + canned.append("New interjection text!") + canned.extend([state_canned] * 6) + mock_client = MockLLMClient(canned=list(canned)) + + settings = Settings(featherless_api_key="test") + + with open_db(db_path) as conn: + new_text = asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_multi", + original_assistant_event_id=primary_id, + ) + ) + assert new_text == "New primary text." + + # Both originals are superseded. + primary_super = conn.execute( + "SELECT superseded_by FROM event_log WHERE id = ?", (primary_id,) + ).fetchone()[0] + interjection_super = conn.execute( + "SELECT superseded_by FROM event_log WHERE id = ?", + (interjection_id,), + ).fetchone()[0] + assert primary_super is not None + assert interjection_super is not None + + # Two NEW assistant_turn events exist (the regenerated primary + # and the regenerated interjection), both pinned to the same + # user_turn_id as the originals. + cur = conn.execute( + "SELECT id, payload_json FROM event_log " + "WHERE kind = 'assistant_turn' AND id NOT IN (?, ?) " + "ORDER BY id", + (primary_id, interjection_id), + ).fetchall() + assert len(cur) == 2 + new_primary_id, new_primary_payload_json = cur[0] + new_interjection_id, new_interjection_payload_json = cur[1] + new_primary_payload = json.loads(new_primary_payload_json) + new_interjection_payload = json.loads(new_interjection_payload_json) + + assert new_primary_payload["text"] == "New primary text." + assert new_primary_payload["speaker_id"] == "bot_a" + assert new_primary_payload["user_turn_id"] == ut_id + assert new_primary_payload["regenerated_from"] == primary_id + assert "interjection_of" not in new_primary_payload + + assert new_interjection_payload["text"] == "New interjection text!" + assert new_interjection_payload["speaker_id"] == "bot_b" + assert new_interjection_payload["user_turn_id"] == ut_id + assert new_interjection_payload["regenerated_from"] == interjection_id + # interjection_of links to the new primary's speaker (matches + # the existing convention in chat/web/turns.py). + assert new_interjection_payload["interjection_of"] == "bot_a" + + # The originals' supersede pointers reach the new ones. + assert primary_super == new_primary_id + assert interjection_super == new_interjection_id + + +def test_regenerate_drops_interjection_when_classifier_returns_false( + tmp_path, monkeypatch +): + """T73.2: when the original group included an interjection but the + classifier returns False this time, the new group is primary-only. + + The original interjection is still superseded (we don't leave it + visible in the timeline alongside a regenerated primary it no longer + follows from), but no replacement assistant_turn is appended. + """ + import asyncio + + from chat.config import Settings + from chat.db.migrate import apply_migrations + from chat.services import regenerate as regenerate_module + from chat.services.interjection import InterjectionDecision + from chat.services.regenerate import regenerate_assistant_turn + + db_path = tmp_path / "test.db" + cfg = tmp_path / "config.toml" + cfg.write_text('featherless_api_key = "test"\n') + monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg)) + monkeypatch.setenv("CHAT_DB_PATH", str(db_path)) + apply_migrations(db_path) + + ut_id, primary_id, interjection_id = _seed_with_interjection_group(db_path) + + async def _stub_no_interject(*_args, **_kwargs): + return InterjectionDecision( + should_interject=False, reason="quiet" + ) + + monkeypatch.setattr( + regenerate_module, "detect_interjection", _stub_no_interject + ) + + # Canned queue: primary narrative + 6 state-update calls. No + # interjection stream because the classifier short-circuits. + state_canned = json.dumps( + {"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []} + ) + canned: list[str] = ["New primary text."] + [state_canned] * 6 + mock_client = MockLLMClient(canned=list(canned)) + + settings = Settings(featherless_api_key="test") + + with open_db(db_path) as conn: + new_text = asyncio.run( + regenerate_assistant_turn( + conn, + mock_client, + settings=settings, + chat_id="chat_multi", + original_assistant_event_id=primary_id, + ) + ) + assert new_text == "New primary text." + + # Original primary superseded by the new primary. + primary_super = conn.execute( + "SELECT superseded_by FROM event_log WHERE id = ?", (primary_id,) + ).fetchone()[0] + # Original interjection ALSO superseded — we don't leave a + # dangling beat attached to a regenerated primary that no longer + # warrants a follow-on. Back-pointer goes to the new primary. + interjection_super = conn.execute( + "SELECT superseded_by FROM event_log WHERE id = ?", + (interjection_id,), + ).fetchone()[0] + assert primary_super is not None + assert interjection_super is not None + assert interjection_super == primary_super # both point at new primary + + # Exactly ONE new assistant_turn — the primary; no replacement + # interjection. + cur = conn.execute( + "SELECT payload_json FROM event_log " + "WHERE kind = 'assistant_turn' AND id NOT IN (?, ?) " + "AND superseded_by IS NULL", + (primary_id, interjection_id), + ).fetchall() + assert len(cur) == 1 + new_primary_payload = json.loads(cur[0][0]) + assert new_primary_payload["text"] == "New primary text." + assert "interjection_of" not in new_primary_payload