feat: wire embedding worker call sites in turns/meanwhile/skip/regenerate (T97.5)
This commit is contained in:
@@ -103,6 +103,7 @@ async def regenerate_assistant_turn(
|
|||||||
chat_id: str,
|
chat_id: str,
|
||||||
original_assistant_event_id: int,
|
original_assistant_event_id: int,
|
||||||
edited_user_prose: str | None = None,
|
edited_user_prose: str | None = None,
|
||||||
|
app=None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Regenerate the assistant turn linked to ``original_assistant_event_id``.
|
"""Regenerate the assistant turn linked to ``original_assistant_event_id``.
|
||||||
|
|
||||||
@@ -414,6 +415,7 @@ async def regenerate_assistant_turn(
|
|||||||
narrative_text=new_text,
|
narrative_text=new_text,
|
||||||
scene_id=scene["id"] if scene else None,
|
scene_id=scene["id"] if scene else None,
|
||||||
chat_clock_at=chat.get("time"),
|
chat_clock_at=chat.get("time"),
|
||||||
|
app=app,
|
||||||
)
|
)
|
||||||
|
|
||||||
last_at = chat.get("time")
|
last_at = chat.get("time")
|
||||||
@@ -648,6 +650,7 @@ async def regenerate_assistant_turn(
|
|||||||
narrative_text=interject_text,
|
narrative_text=interject_text,
|
||||||
scene_id=scene["id"] if scene else None,
|
scene_id=scene["id"] if scene else None,
|
||||||
chat_clock_at=chat.get("time"),
|
chat_clock_at=chat.get("time"),
|
||||||
|
app=app,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Re-run the multi-pair state-update with the post-interjection
|
# Re-run the multi-pair state-update with the post-interjection
|
||||||
|
|||||||
@@ -993,6 +993,7 @@ async def skip_elision(
|
|||||||
chat_id=chat_id,
|
chat_id=chat_id,
|
||||||
new_time=new_time,
|
new_time=new_time,
|
||||||
landing_state_hint=landing_state_hint,
|
landing_state_hint=landing_state_hint,
|
||||||
|
app=request.app,
|
||||||
)
|
)
|
||||||
except ChatNotFoundError as exc:
|
except ChatNotFoundError as exc:
|
||||||
# Missing chat row: typed exception (T81) replaces the prior
|
# Missing chat row: typed exception (T81) replaces the prior
|
||||||
@@ -1036,6 +1037,7 @@ async def skip_jump(
|
|||||||
new_time=new_time,
|
new_time=new_time,
|
||||||
notable_prose=notable_prose,
|
notable_prose=notable_prose,
|
||||||
reset_activity=reset_flag,
|
reset_activity=reset_flag,
|
||||||
|
app=request.app,
|
||||||
)
|
)
|
||||||
except ChatNotFoundError as exc:
|
except ChatNotFoundError as exc:
|
||||||
# Missing chat row: typed exception (T81) replaces the prior
|
# Missing chat row: typed exception (T81) replaces the prior
|
||||||
|
|||||||
@@ -131,6 +131,7 @@ async def process_meanwhile_turn(
|
|||||||
*,
|
*,
|
||||||
chat_id: str,
|
chat_id: str,
|
||||||
prose: str,
|
prose: str,
|
||||||
|
app=None,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Run one meanwhile turn end-to-end.
|
"""Run one meanwhile turn end-to-end.
|
||||||
|
|
||||||
@@ -314,6 +315,7 @@ async def process_meanwhile_turn(
|
|||||||
narrative_text=text,
|
narrative_text=text,
|
||||||
scene_id=scene_id,
|
scene_id=scene_id,
|
||||||
chat_clock_at=chat.get("time"),
|
chat_clock_at=chat.get("time"),
|
||||||
|
app=app,
|
||||||
)
|
)
|
||||||
|
|
||||||
# 9. Post-turn state-update — exactly 2 directed pairs over the
|
# 9. Post-turn state-update — exactly 2 directed pairs over the
|
||||||
|
|||||||
@@ -91,6 +91,7 @@ async def process_elision_skip(
|
|||||||
chat_id: str,
|
chat_id: str,
|
||||||
new_time: str,
|
new_time: str,
|
||||||
landing_state_hint: str = "",
|
landing_state_hint: str = "",
|
||||||
|
app=None,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Run an elision skip end-to-end.
|
"""Run an elision skip end-to-end.
|
||||||
|
|
||||||
@@ -175,6 +176,7 @@ async def process_jump_skip(
|
|||||||
new_time: str,
|
new_time: str,
|
||||||
notable_prose: str = "",
|
notable_prose: str = "",
|
||||||
reset_activity: bool = False,
|
reset_activity: bool = False,
|
||||||
|
app=None,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Run a jump skip end-to-end.
|
"""Run a jump skip end-to-end.
|
||||||
|
|
||||||
@@ -254,6 +256,7 @@ async def process_jump_skip(
|
|||||||
chat_clock_at=new_time,
|
chat_clock_at=new_time,
|
||||||
source="synthesized",
|
source="synthesized",
|
||||||
significance=mem.significance,
|
significance=mem.significance,
|
||||||
|
app=app,
|
||||||
)
|
)
|
||||||
|
|
||||||
narration = await narrate_skip(
|
narration = await narrate_skip(
|
||||||
|
|||||||
@@ -248,6 +248,7 @@ async def post_turn(
|
|||||||
settings,
|
settings,
|
||||||
chat_id=chat_id,
|
chat_id=chat_id,
|
||||||
prose=prose,
|
prose=prose,
|
||||||
|
app=request.app,
|
||||||
)
|
)
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
raise HTTPException(status_code=400, detail=str(exc))
|
raise HTTPException(status_code=400, detail=str(exc))
|
||||||
@@ -352,6 +353,7 @@ async def post_turn(
|
|||||||
new_time=new_time,
|
new_time=new_time,
|
||||||
landing_state_hint=getattr(parsed, "landing_state_hint", "")
|
landing_state_hint=getattr(parsed, "landing_state_hint", "")
|
||||||
or "",
|
or "",
|
||||||
|
app=request.app,
|
||||||
)
|
)
|
||||||
except ChatNotFoundError as exc:
|
except ChatNotFoundError as exc:
|
||||||
# Defensive: chat existence is checked above, so this only
|
# Defensive: chat existence is checked above, so this only
|
||||||
@@ -512,6 +514,7 @@ async def post_turn(
|
|||||||
narrative_text=primary_text,
|
narrative_text=primary_text,
|
||||||
scene_id=scene["id"] if scene else None,
|
scene_id=scene["id"] if scene else None,
|
||||||
chat_clock_at=chat.get("time"),
|
chat_clock_at=chat.get("time"),
|
||||||
|
app=request.app,
|
||||||
)
|
)
|
||||||
|
|
||||||
# 7b. Post-turn state-update pass (Requirements §3.4 / T40). All
|
# 7b. Post-turn state-update pass (Requirements §3.4 / T40). All
|
||||||
@@ -746,6 +749,7 @@ async def post_turn(
|
|||||||
narrative_text=interjection_text,
|
narrative_text=interjection_text,
|
||||||
scene_id=scene["id"] if scene else None,
|
scene_id=scene["id"] if scene else None,
|
||||||
chat_clock_at=chat.get("time"),
|
chat_clock_at=chat.get("time"),
|
||||||
|
app=request.app,
|
||||||
)
|
)
|
||||||
|
|
||||||
# T74.2: enqueue a significance pass for the interjection
|
# T74.2: enqueue a significance pass for the interjection
|
||||||
@@ -1092,6 +1096,7 @@ async def regenerate_turn(
|
|||||||
chat_id=chat_id,
|
chat_id=chat_id,
|
||||||
original_assistant_event_id=event_id,
|
original_assistant_event_id=event_id,
|
||||||
edited_user_prose=edited_prose,
|
edited_user_prose=edited_prose,
|
||||||
|
app=request.app,
|
||||||
)
|
)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise HTTPException(status_code=404, detail=str(e))
|
raise HTTPException(status_code=404, detail=str(e))
|
||||||
|
|||||||
@@ -0,0 +1,180 @@
|
|||||||
|
"""Phase 4 cross-feature integration tests (T97 follow-up).
|
||||||
|
|
||||||
|
Wave 8 / T101 will populate this file with the full Phase 4 retrieval +
|
||||||
|
embedding integration suite. For now this houses a single test pinning
|
||||||
|
the T97.5 wiring: the production turn route plumbs ``app=request.app``
|
||||||
|
all the way through ``record_turn_memory_for_present`` so the embedding
|
||||||
|
worker actually receives jobs in production. Without this fix-up the
|
||||||
|
plumbing added in T97 was dormant — every per-witness write took the
|
||||||
|
no-app branch and silently dropped the embed enqueue.
|
||||||
|
|
||||||
|
The test monkeypatches ``app.state.embedding_worker.enqueue`` to record
|
||||||
|
jobs (rather than draining the worker mid-test) so the assertion is
|
||||||
|
deterministic and free of asyncio-timing flakiness inside FastAPI's
|
||||||
|
TestClient. The bug we're guarding against is "did the call site pass
|
||||||
|
``app`` at all" — the worker's drain path is exercised in
|
||||||
|
:mod:`tests.test_embedding_worker`, so duplicating that here would add
|
||||||
|
no coverage.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
from chat.app import app
|
||||||
|
from chat.db.connection import open_db
|
||||||
|
from chat.eventlog.log import append_event
|
||||||
|
from chat.eventlog.projector import project
|
||||||
|
from chat.llm.mock import MockLLMClient
|
||||||
|
|
||||||
|
|
||||||
|
def _zero_state() -> str:
|
||||||
|
return json.dumps(
|
||||||
|
{"affinity_delta": 0, "trust_delta": 0, "knowledge_facts": []}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _override_llm(canned: list[str]) -> MockLLMClient:
|
||||||
|
from chat.web.kickoff import get_llm_client
|
||||||
|
|
||||||
|
mock = MockLLMClient(canned=list(canned))
|
||||||
|
app.dependency_overrides[get_llm_client] = lambda: mock
|
||||||
|
return mock
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def app_state_setup(tmp_path, monkeypatch):
|
||||||
|
cfg = tmp_path / "config.toml"
|
||||||
|
cfg.write_text('featherless_api_key = "test"\n')
|
||||||
|
monkeypatch.setenv("CHAT_CONFIG_PATH", str(cfg))
|
||||||
|
db = tmp_path / "test.db"
|
||||||
|
monkeypatch.setenv("CHAT_DB_PATH", str(db))
|
||||||
|
with TestClient(app) as c:
|
||||||
|
# The background worker is disabled so the canned-response queue
|
||||||
|
# is consumed only by the request path. The embedding worker
|
||||||
|
# stays "started" but its loop won't observe the captured
|
||||||
|
# enqueues — we replace ``enqueue`` on the worker instance below.
|
||||||
|
app.state.background_worker.enabled = False
|
||||||
|
yield c
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
def _seed(db_path: Path) -> None:
|
||||||
|
"""Mirror of ``tests/test_turn_flow.py::_seed`` — single bot + chat
|
||||||
|
+ edge + activities so the prompt assembler has something to render.
|
||||||
|
"""
|
||||||
|
with open_db(db_path) as conn:
|
||||||
|
append_event(
|
||||||
|
conn,
|
||||||
|
kind="bot_authored",
|
||||||
|
payload={
|
||||||
|
"id": "bot_a",
|
||||||
|
"name": "BotA",
|
||||||
|
"persona": "thoughtful, observant",
|
||||||
|
"voice_samples": [],
|
||||||
|
"traits": [],
|
||||||
|
"backstory": "",
|
||||||
|
"initial_relationship_to_you": "",
|
||||||
|
"kickoff_prose": "...",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
append_event(
|
||||||
|
conn,
|
||||||
|
kind="chat_created",
|
||||||
|
payload={
|
||||||
|
"id": "chat_bot_a",
|
||||||
|
"host_bot_id": "bot_a",
|
||||||
|
"initial_time": "2026-04-26T20:00:00+00:00",
|
||||||
|
"narrative_anchor": "Day 1",
|
||||||
|
"weather": "",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
append_event(
|
||||||
|
conn,
|
||||||
|
kind="edge_update",
|
||||||
|
payload={
|
||||||
|
"source_id": "bot_a",
|
||||||
|
"target_id": "you",
|
||||||
|
"chat_id": "chat_bot_a",
|
||||||
|
"knowledge_facts": ["coworker"],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
for entity_id, verb in [("you", "talking"), ("bot_a", "listening")]:
|
||||||
|
append_event(
|
||||||
|
conn,
|
||||||
|
kind="activity_change",
|
||||||
|
payload={
|
||||||
|
"entity_id": entity_id,
|
||||||
|
"posture": "sitting",
|
||||||
|
"action": {
|
||||||
|
"verb": verb,
|
||||||
|
"interruptible": True,
|
||||||
|
"required_attention": "low",
|
||||||
|
"expected_duration": "ongoing",
|
||||||
|
},
|
||||||
|
"attention": "",
|
||||||
|
"holding": [],
|
||||||
|
"status": {},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
project(conn)
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_turn_embeddings_indexed_via_worker_hook(
|
||||||
|
app_state_setup, tmp_path
|
||||||
|
):
|
||||||
|
"""POST a turn; the route must pass ``app=request.app`` into
|
||||||
|
``record_turn_memory_for_present`` so the per-witness write enqueues
|
||||||
|
an :class:`EmbeddingJob` on ``app.state.embedding_worker``.
|
||||||
|
|
||||||
|
Without the T97.5 wiring this test fails: the call site previously
|
||||||
|
omitted ``app=`` and the helper's ``app is None`` branch silently
|
||||||
|
skipped every enqueue. We monkeypatch ``enqueue`` on the live
|
||||||
|
embedding worker (rather than draining the queue mid-request) so the
|
||||||
|
assertion does not depend on asyncio scheduling inside the
|
||||||
|
TestClient — the bug is in the wiring, and the wiring is what we
|
||||||
|
pin. The drain path is covered separately in
|
||||||
|
:mod:`tests.test_embedding_worker`.
|
||||||
|
"""
|
||||||
|
_seed(tmp_path / "test.db")
|
||||||
|
|
||||||
|
canned_parse = json.dumps(
|
||||||
|
{"segments": [{"kind": "dialogue", "text": "hello"}]}
|
||||||
|
)
|
||||||
|
_override_llm(
|
||||||
|
[canned_parse, "Hi there.", _zero_state(), _zero_state()]
|
||||||
|
)
|
||||||
|
|
||||||
|
captured: list = []
|
||||||
|
worker = app.state.embedding_worker
|
||||||
|
original_enqueue = worker.enqueue
|
||||||
|
worker.enqueue = captured.append # type: ignore[assignment]
|
||||||
|
try:
|
||||||
|
response = app_state_setup.post(
|
||||||
|
"/chats/chat_bot_a/turns", data={"prose": "hello"}
|
||||||
|
)
|
||||||
|
assert response.status_code == 204
|
||||||
|
finally:
|
||||||
|
worker.enqueue = original_enqueue # type: ignore[assignment]
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
# Single-bot turn -> one ``memory_written`` -> one EmbeddingJob.
|
||||||
|
# The job's ``memory_id`` should match the freshly-projected memory
|
||||||
|
# row, and its ``text`` should carry the assistant's narrative text.
|
||||||
|
assert len(captured) == 1
|
||||||
|
job = captured[0]
|
||||||
|
assert job.text == "Hi there."
|
||||||
|
|
||||||
|
with open_db(tmp_path / "test.db") as conn:
|
||||||
|
memory_ids = [
|
||||||
|
r[0]
|
||||||
|
for r in conn.execute(
|
||||||
|
"SELECT id FROM memories WHERE owner_id = ?",
|
||||||
|
("bot_a",),
|
||||||
|
).fetchall()
|
||||||
|
]
|
||||||
|
assert job.memory_id in memory_ids
|
||||||
Reference in New Issue
Block a user