perf: scope regenerate sibling-lookup to chat_id (T83.3)
The sibling assistant_turn lookup in ``regenerate_assistant_turn`` previously scanned every non-superseded ``assistant_turn`` row across the whole database and filtered in Python. With many chats in the log this is O(total_assistant_turns) per regenerate. Push the chat_id filter into SQL via ``json_extract(payload_json, '$.chat_id') = ?`` and add ``ORDER BY id DESC LIMIT 50`` so worst-case work is bounded even within a single chat. Mirrors the SQL pattern in ``chat.web.meanwhile._last_meanwhile_speaker``. Test added: test_regenerate_sibling_lookup_scoped_to_chat seeds two chats — the second has an interjection whose ``interjection_of`` value collides with the first chat's primary speaker. Regenerating chat A must leave chat B's rows untouched and the regenerated chat A interjection's ``regenerated_from`` must point at chat A's original (not chat B's). Pre-T83.3 a global query could in principle latch onto cross-chat rows.
This commit is contained in:
@@ -148,6 +148,13 @@ async def regenerate_assistant_turn(
|
|||||||
# the silent witness (the bot that wasn't the primary addressee).
|
# the silent witness (the bot that wasn't the primary addressee).
|
||||||
# Filter on ``superseded_by IS NULL`` so prior regenerates of this
|
# Filter on ``superseded_by IS NULL`` so prior regenerates of this
|
||||||
# group don't reappear as siblings.
|
# group don't reappear as siblings.
|
||||||
|
#
|
||||||
|
# T83.3: push the chat_id filter into SQL via ``json_extract`` so
|
||||||
|
# the query doesn't scan every assistant_turn row across the whole
|
||||||
|
# database. ``LIMIT 50`` bounds worst-case work even when chat_id
|
||||||
|
# isn't selective (e.g. a single chat with many turns) — we only
|
||||||
|
# need the one matching sibling. Mirrors the SQL pattern in
|
||||||
|
# ``chat.web.meanwhile._last_meanwhile_speaker``.
|
||||||
original_interjection_event_id: int | None = None
|
original_interjection_event_id: int | None = None
|
||||||
original_interjection_payload: dict | None = None
|
original_interjection_payload: dict | None = None
|
||||||
if original_user_turn_id is not None:
|
if original_user_turn_id is not None:
|
||||||
@@ -155,8 +162,11 @@ async def regenerate_assistant_turn(
|
|||||||
"SELECT id, payload_json FROM event_log "
|
"SELECT id, payload_json FROM event_log "
|
||||||
"WHERE kind = 'assistant_turn' "
|
"WHERE kind = 'assistant_turn' "
|
||||||
" AND id != ? "
|
" AND id != ? "
|
||||||
" AND superseded_by IS NULL",
|
" AND superseded_by IS NULL "
|
||||||
(original_assistant_event_id,),
|
" AND json_extract(payload_json, '$.chat_id') = ? "
|
||||||
|
"ORDER BY id DESC "
|
||||||
|
"LIMIT 50",
|
||||||
|
(original_assistant_event_id, chat_id),
|
||||||
)
|
)
|
||||||
for sib_id, sib_payload_json in sibling_cur.fetchall():
|
for sib_id, sib_payload_json in sibling_cur.fetchall():
|
||||||
sib_payload = json.loads(sib_payload_json)
|
sib_payload = json.loads(sib_payload_json)
|
||||||
|
|||||||
@@ -664,6 +664,182 @@ def test_regenerate_drops_interjection_when_classifier_returns_false(
|
|||||||
assert "interjection_of" not in new_primary_payload
|
assert "interjection_of" not in new_primary_payload
|
||||||
|
|
||||||
|
|
||||||
|
def test_regenerate_sibling_lookup_scoped_to_chat(tmp_path, monkeypatch):
|
||||||
|
"""T83.3: regenerate's sibling-interjection lookup is scoped to the
|
||||||
|
chat being regenerated.
|
||||||
|
|
||||||
|
Setup: TWO chats, each with a primary + interjection turn group whose
|
||||||
|
rows happen to share the same ``user_turn_id`` value (the projector
|
||||||
|
assigns event_log ids monotonically across the whole database, so
|
||||||
|
when each chat is seeded back-to-back the chat A primary lands on a
|
||||||
|
different ``user_turn_id`` than chat B's — but in older versions the
|
||||||
|
sibling query had no chat predicate, so it could in principle latch
|
||||||
|
onto a row from a different chat if ids collided in some unusual
|
||||||
|
flow). We construct the seeding so chat B's interjection has the
|
||||||
|
SAME ``interjection_of`` value as the chat A primary's speaker_id —
|
||||||
|
pre-T83.3 the global query could have picked it up.
|
||||||
|
|
||||||
|
Assert: regenerating the chat A primary leaves chat B's rows
|
||||||
|
untouched (no supersede), and the regenerated chat A turn group's
|
||||||
|
interjection (the only one regenerate should regenerate) has its
|
||||||
|
``regenerated_from`` pointing at the chat A original interjection,
|
||||||
|
not chat B's.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from chat.config import Settings
|
||||||
|
from chat.db.migrate import apply_migrations
|
||||||
|
from chat.services import regenerate as regenerate_module
|
||||||
|
from chat.services.interjection import InterjectionDecision
|
||||||
|
from chat.services.regenerate import regenerate_assistant_turn
|
||||||
|
|
||||||
|
db_path = tmp_path / "test.db"
|
||||||
|
cfg = tmp_path / "config.toml"
|
||||||
|
cfg.write_text('featherless_api_key = "test"\n')
|
||||||
|
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
|
||||||
|
monkeypatch.setenv("CHAT_DB_PATH", str(db_path))
|
||||||
|
apply_migrations(db_path)
|
||||||
|
|
||||||
|
# Seed chat A's interjection group.
|
||||||
|
a_ut_id, a_primary_id, a_interjection_id = _seed_with_interjection_group(
|
||||||
|
db_path
|
||||||
|
)
|
||||||
|
|
||||||
|
# Seed chat B with the same shape but a different chat_id and bot
|
||||||
|
# ids, then add an interjection group whose ``interjection_of``
|
||||||
|
# points at "bot_a" so a global (unscoped) query could collide.
|
||||||
|
with open_db(db_path) as conn:
|
||||||
|
for bot_id, name in (("bot_c", "BotC"), ("bot_d", "BotD")):
|
||||||
|
append_event(
|
||||||
|
conn,
|
||||||
|
kind="bot_authored",
|
||||||
|
payload={
|
||||||
|
"id": bot_id,
|
||||||
|
"name": name,
|
||||||
|
"persona": "",
|
||||||
|
"voice_samples": [],
|
||||||
|
"traits": [],
|
||||||
|
"backstory": "",
|
||||||
|
"initial_relationship_to_you": "",
|
||||||
|
"kickoff_prose": "",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
append_event(
|
||||||
|
conn,
|
||||||
|
kind="chat_created",
|
||||||
|
payload={
|
||||||
|
"id": "chat_other",
|
||||||
|
"host_bot_id": "bot_c",
|
||||||
|
"guest_bot_id": "bot_d",
|
||||||
|
"initial_time": "2026-04-26T20:00:00+00:00",
|
||||||
|
"narrative_anchor": "Day 1",
|
||||||
|
"weather": "",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
b_ut_id = append_event(
|
||||||
|
conn,
|
||||||
|
kind="user_turn",
|
||||||
|
payload={
|
||||||
|
"chat_id": "chat_other",
|
||||||
|
"prose": "different chat",
|
||||||
|
"segments": [],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
b_primary_id = append_event(
|
||||||
|
conn,
|
||||||
|
kind="assistant_turn",
|
||||||
|
payload={
|
||||||
|
"chat_id": "chat_other",
|
||||||
|
"speaker_id": "bot_c",
|
||||||
|
"text": "Other primary.",
|
||||||
|
"truncated": False,
|
||||||
|
"user_turn_id": b_ut_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
# The chat B interjection's ``interjection_of`` references
|
||||||
|
# "bot_a" — the chat A primary's speaker. Pre-T83.3 the global
|
||||||
|
# sibling query could mis-match this row.
|
||||||
|
b_interjection_id = append_event(
|
||||||
|
conn,
|
||||||
|
kind="assistant_turn",
|
||||||
|
payload={
|
||||||
|
"chat_id": "chat_other",
|
||||||
|
"speaker_id": "bot_d",
|
||||||
|
"text": "Cross-chat noise.",
|
||||||
|
"truncated": False,
|
||||||
|
"user_turn_id": b_ut_id,
|
||||||
|
"interjection_of": "bot_a",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Stub the interjection classifier to return True so the regenerate
|
||||||
|
# actively walks the sibling-discovery path.
|
||||||
|
async def _stub_should_interject(*_args, **_kwargs):
|
||||||
|
return InterjectionDecision(should_interject=True, reason="fired")
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
regenerate_module, "detect_interjection", _stub_should_interject
|
||||||
|
)
|
||||||
|
|
||||||
|
state_canned = json.dumps(
|
||||||
|
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
|
||||||
|
)
|
||||||
|
canned: list[str] = (
|
||||||
|
["New chat A primary."]
|
||||||
|
+ [state_canned] * 6
|
||||||
|
+ ["New chat A interjection."]
|
||||||
|
+ [state_canned] * 6
|
||||||
|
)
|
||||||
|
mock_client = MockLLMClient(canned=list(canned))
|
||||||
|
settings = Settings(featherless_api_key="test")
|
||||||
|
|
||||||
|
with open_db(db_path) as conn:
|
||||||
|
new_text = asyncio.run(
|
||||||
|
regenerate_assistant_turn(
|
||||||
|
conn,
|
||||||
|
mock_client,
|
||||||
|
settings=settings,
|
||||||
|
chat_id="chat_multi",
|
||||||
|
original_assistant_event_id=a_primary_id,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert new_text == "New chat A primary."
|
||||||
|
|
||||||
|
# Chat B rows are untouched — neither superseded nor referenced.
|
||||||
|
b_primary_super = conn.execute(
|
||||||
|
"SELECT superseded_by FROM event_log WHERE id = ?",
|
||||||
|
(b_primary_id,),
|
||||||
|
).fetchone()[0]
|
||||||
|
b_interjection_super = conn.execute(
|
||||||
|
"SELECT superseded_by FROM event_log WHERE id = ?",
|
||||||
|
(b_interjection_id,),
|
||||||
|
).fetchone()[0]
|
||||||
|
assert b_primary_super is None
|
||||||
|
assert b_interjection_super is None
|
||||||
|
|
||||||
|
# Chat A's regenerated interjection has its ``regenerated_from``
|
||||||
|
# pointing at chat A's original interjection — NOT chat B's.
|
||||||
|
cur = conn.execute(
|
||||||
|
"SELECT payload_json FROM event_log "
|
||||||
|
"WHERE kind = 'assistant_turn' "
|
||||||
|
" AND id NOT IN (?, ?, ?, ?) "
|
||||||
|
" AND superseded_by IS NULL",
|
||||||
|
(a_primary_id, a_interjection_id, b_primary_id, b_interjection_id),
|
||||||
|
).fetchall()
|
||||||
|
# Two new rows: regenerated primary + regenerated interjection.
|
||||||
|
assert len(cur) == 2
|
||||||
|
payloads = [json.loads(row[0]) for row in cur]
|
||||||
|
# Find the regenerated interjection (carries interjection_of).
|
||||||
|
new_interject_payloads = [
|
||||||
|
p for p in payloads if p.get("interjection_of")
|
||||||
|
]
|
||||||
|
assert len(new_interject_payloads) == 1
|
||||||
|
assert new_interject_payloads[0]["regenerated_from"] == a_interjection_id
|
||||||
|
# Pin chat scope on every new row.
|
||||||
|
for p in payloads:
|
||||||
|
assert p["chat_id"] == "chat_multi"
|
||||||
|
|
||||||
|
|
||||||
def test_regenerate_registers_task_in_in_flight_tasks(tmp_path, monkeypatch):
|
def test_regenerate_registers_task_in_in_flight_tasks(tmp_path, monkeypatch):
|
||||||
"""T83.1: regenerate's streaming Task is registered in the chat-keyed
|
"""T83.1: regenerate's streaming Task is registered in the chat-keyed
|
||||||
``_in_flight_tasks`` dict so the /turns/cancel route can cancel a
|
``_in_flight_tasks`` dict so the /turns/cancel route can cancel a
|
||||||
|
|||||||
Reference in New Issue
Block a user