Files
chat/chat/services/scene_summarize.py
T

527 lines
19 KiB
Python

"""Per-POV scene summary and edge summary update on scene close (T27).
When a scene closes — either auto-detected by the hard-signal classifier
in T26 or fired by the manual close button on the drawer — we run a
single-shot classifier per present witness that produces three signals
in one pass:
* ``summary`` — a 2-4 sentence per-POV recap of the scene from this
witness's perspective. Different from omniscient narration; focuses on
what the witness noticed/felt/remembers.
* ``knowledge_facts`` — concrete new things this witness learned about
the user during the scene. Promoted to the directed edge's
``knowledge`` list via ``edge_update``.
* ``relationship_summary`` — a 1-2 sentence delta on how the
witness's relationship to the user shifted in this scene. v1
combines this with the prior edge summary by simple concatenation —
the LLM is asked to phrase ``relationship_summary`` as a merge-ready
fragment, so the result reads naturally without a second classifier
round-trip.
Phase 1 single-bot only the host bot is summarized; "you" doesn't have
a memory store in v1 so per-POV writes for the user are deferred. The
:func:`apply_scene_close_summary` driver is intentionally tolerant: if
no memories belong to the closed scene it silently skips the rewrite,
and a flapping classifier returns the empty default so the close flow
keeps moving.
"""
from __future__ import annotations
import json
import uuid
from datetime import datetime, timezone
from sqlite3 import Connection
from pydantic import BaseModel, Field
from chat.eventlog.log import append_and_apply
from chat.llm.classify import classify
from chat.llm.client import LLMClient
class ScenePOVSummary(BaseModel):
"""Classifier output: one witness's view of a closing scene.
Defaults are an inert no-op so a classifier failure is harmless —
callers can apply the result unconditionally and end up not
rewriting anything when the model misbehaves.
"""
summary: str = ""
knowledge_facts: list[str] = Field(default_factory=list)
relationship_summary: str = ""
_SYSTEM_TEMPLATE = (
"You are summarizing a roleplay scene from {bot_name}'s point of "
"view. Read the dialogue, then output JSON with exactly three "
"fields:\n"
"- summary: 2-4 sentences, in {bot_name}'s POV, of what happened "
"in the scene. This is NOT omniscient narration — focus on what "
"{bot_name} noticed, felt, and would remember.\n"
"- knowledge_facts: list of NEW factual things {bot_name} learned "
"about the user during this scene. Use specific stated content; do "
"not infer or interpret. Empty list is fine.\n"
"- relationship_summary: a SHORT (1-2 sentence) summary of how "
"{bot_name}'s relationship with the user changed or developed in "
"this scene. Phrase it so it reads as a continuation of the prior "
"summary; the caller will concatenate them.\n\n"
"Be specific. Avoid generic phrases."
)
def _format_dialogue(dialogue: list[dict]) -> str:
if not dialogue:
return "(no dialogue)"
return "\n".join(
f"{turn.get('speaker', '?')}: {turn.get('text', '')}"
for turn in dialogue
)
async def summarize_scene(
client: LLMClient,
*,
model: str,
bot_name: str,
bot_persona: str,
you_name: str,
prior_edge_summary: str,
dialogue: list[dict],
timeout_s: float = 10.0,
) -> ScenePOVSummary:
"""Run the per-POV summary classifier for one witness.
The signature mirrors :func:`compute_state_update` — passing the
bot's name and persona as separate fields lets the prompt address
the model directly ("YOU are {bot_name}") rather than handing it an
opaque id. ``prior_edge_summary`` is included so the classifier can
phrase ``relationship_summary`` as an additive fragment.
Returns the empty default on classifier failure (after one retry)
rather than raising, so the close pipeline keeps moving.
"""
system = _SYSTEM_TEMPLATE.format(bot_name=bot_name)
user = (
f"YOU are {bot_name}. {bot_persona or '(no persona on file)'}\n"
f"USER name: {you_name}\n"
f"PRIOR EDGE SUMMARY ({bot_name} -> {you_name}): "
f"{prior_edge_summary or '(empty)'}\n\n"
f"DIALOGUE:\n{_format_dialogue(dialogue)}\n\n"
f"Produce the JSON summary in {bot_name}'s POV."
)
return await classify(
client,
model=model,
system=system,
user=user,
schema=ScenePOVSummary,
default=ScenePOVSummary(),
timeout_s=timeout_s,
)
def _read_recent_dialogue(
conn: Connection, chat_id: str, *, limit: int = 50
) -> list[dict]:
"""Pull the last ``limit`` user/assistant turns for ``chat_id``.
Phase 1 ``user_turn`` / ``assistant_turn`` events don't carry a
``scene_id``, so we approximate the scene's transcript by taking
the most recent turns of the chat. Superseded and hidden rows are
filtered out so regenerated turns (T29) don't bleed into the
summary.
"""
cur = conn.execute(
"SELECT kind, payload_json FROM event_log "
"WHERE kind IN ('user_turn', 'assistant_turn') "
" AND superseded_by IS NULL AND hidden = 0 "
"ORDER BY id DESC LIMIT ?",
(limit,),
)
rows = list(reversed(cur.fetchall()))
out: list[dict] = []
for kind, payload_json in rows:
p = json.loads(payload_json)
if p.get("chat_id") != chat_id:
continue
if kind == "user_turn":
out.append({"speaker": "you", "text": p.get("prose", "")})
else:
out.append(
{
"speaker": p.get("speaker_id", "bot"),
"text": p.get("text", ""),
}
)
return out
async def _summarize_and_apply_for_witness(
conn: Connection,
client: LLMClient,
*,
classifier_model: str,
chat_id: str,
scene_id: int,
bot_id: str,
you_name: str,
dialogue: list[dict],
timeout_s: float,
key_quotes_suffix: str = "",
) -> ScenePOVSummary:
"""Run :func:`summarize_scene` for one bot witness and apply the
three projected updates (memory pov_summary rewrite, edge summary
overwrite, edge knowledge_facts append).
Tolerant of missing pieces in the same way Phase 1 was: no memory
row -> skip the rewrite; no edge row -> skip the edge_summary write
(the empty-default classifier output simply yields no rewrites).
``key_quotes_suffix`` is appended verbatim to the per-POV summary
text before the rewrite lands (T58.1) — empty string is the no-op
default for low-significance scenes.
"""
from chat.state.edges import get_edge
from chat.state.entities import get_bot
bot = get_bot(conn, bot_id) or {"name": bot_id, "persona": ""}
edge_b2y = get_edge(conn, bot_id, "you")
prior_summary = (edge_b2y or {}).get("summary", "") or ""
pov = await summarize_scene(
client,
model=classifier_model,
bot_name=bot.get("name", bot_id),
bot_persona=bot.get("persona", "") or "",
you_name=you_name,
prior_edge_summary=prior_summary,
dialogue=dialogue,
timeout_s=timeout_s,
)
# Update memories belonging to the closed scene for this witness.
cur = conn.execute(
"SELECT id, pov_summary FROM memories "
"WHERE scene_id = ? AND owner_id = ?",
(scene_id, bot_id),
)
for memory_id, prior_pov in cur.fetchall():
if not pov.summary:
# Empty default -> skip the memory rewrite; the seeded
# per-turn pov_summary stays in place.
continue
new_value = pov.summary + key_quotes_suffix
append_and_apply(
conn,
kind="manual_edit",
payload={
"target_kind": "memory_pov_summary",
"target_id": int(memory_id),
"prior_value": prior_pov,
"new_value": new_value,
},
)
# Update this bot->you edge summary if we have an edge row and a
# non-empty relationship_summary to merge.
if edge_b2y is not None and pov.relationship_summary:
new_summary = (
f"{prior_summary} {pov.relationship_summary}".strip()
if prior_summary
else pov.relationship_summary
)
append_and_apply(
conn,
kind="manual_edit",
payload={
"target_kind": "edge_summary",
"target_id": {
"source_id": bot_id,
"target_id": "you",
},
"prior_value": prior_summary,
"new_value": new_summary,
},
)
# Append knowledge_facts to this bot->you edge if present.
if pov.knowledge_facts:
append_and_apply(
conn,
kind="edge_update",
payload={
"source_id": bot_id,
"target_id": "you",
"chat_id": chat_id,
"knowledge_facts": list(pov.knowledge_facts),
},
)
return pov
def _build_key_quotes_suffix(conn: Connection, scene_id: int) -> str:
"""If the scene's max-turn-significance is >= 2, build the
"Key quotes:" suffix from the top-3 highest-significance memory rows
(per requirements §11.1). Otherwise return the empty string so the
per-POV summaries collapse fully (low-significance scenes lose all
raw text in favor of the classifier rewrite).
Quote source is each memory's current ``pov_summary`` — the raw
per-turn narrative seeded by T21, since this helper is called BEFORE
the per-POV rewrite. Texts are truncated to 200 chars to bound
memory row growth across many witnesses.
"""
row = conn.execute(
"SELECT MAX(significance) FROM memories WHERE scene_id = ?",
(scene_id,),
).fetchone()
max_sig = (row[0] if row else None) or 0
if max_sig < 2:
return ""
cur = conn.execute(
"SELECT pov_summary FROM memories WHERE scene_id = ? "
"ORDER BY significance DESC, id ASC LIMIT 3",
(scene_id,),
)
quotes = [
(r[0] or "")[:200]
for r in cur.fetchall()
]
if not quotes:
return ""
lines = "\n".join(f'- "{q}"' for q in quotes)
return f"\n\nKey quotes:\n{lines}"
async def apply_scene_close_summary(
conn: Connection,
client: LLMClient,
*,
classifier_model: str,
chat_id: str,
scene_id: int,
host_bot_id: str,
timeout_s: float = 10.0,
) -> ScenePOVSummary:
"""Drive the per-POV summary pipeline after ``scene_closed``.
Phase 1 (single-bot) behavior — the host bot is summarized once and
the result drives memory + edge rewrites — is preserved exactly when
the chat has no guest. T45 extends this to fan out across each
present bot witness when a guest is also in the room:
1. Gather the closing scene's dialogue from the event_log.
2. For each present witness (host + guest if any), run
:func:`summarize_scene` once with that witness's persona and
their own prior ``bot -> you`` edge summary.
3. For each witness independently:
a. Rewrite each scene-bound memory's ``pov_summary`` via
``manual_edit`` (target_kind ``memory_pov_summary``).
b. Update that witness's ``bot -> you`` edge summary via
``manual_edit`` (target_kind ``edge_summary``). v2 combines
prior + classifier ``relationship_summary`` by simple
concatenation.
c. Append any ``knowledge_facts`` to the same edge via
``edge_update``.
4. If a ``group_node`` row exists for this chat, append a
``group_node_updated`` event whose ``summary`` is the naive
per-POV concat ``f"{name}: {summary}\\n\\n..."``. A true
LLM-merged group view is deferred to Phase 2.5; ``dynamic``
is left empty here for v2 (Phase 3 polishes it).
The host's :class:`ScenePOVSummary` is returned to preserve the
Phase 1 callers' contract.
"""
# Local imports to keep the module-level surface tight and avoid
# any chance of a circular dep through chat.state.*.
from chat.services.thread_detection import detect_threads
from chat.state.entities import get_bot, get_you
from chat.state.group_node import get_group_node
from chat.state.threads import list_open_threads
from chat.state.world import get_chat
you_entity = get_you(conn) or {"name": "you", "persona": ""}
you_name = you_entity.get("name", "you") or "you"
chat = get_chat(conn, chat_id) or {}
guest_bot_id = chat.get("guest_bot_id")
dialogue = _read_recent_dialogue(conn, chat_id)
# T58.1: build the "Key quotes:" suffix BEFORE the per-POV rewrites
# land — quote source is the raw seeded pov_summary text on each
# memory row, which the rewrite about to fire would clobber.
key_quotes_suffix = _build_key_quotes_suffix(conn, scene_id)
host_pov = await _summarize_and_apply_for_witness(
conn,
client,
classifier_model=classifier_model,
chat_id=chat_id,
scene_id=scene_id,
bot_id=host_bot_id,
you_name=you_name,
dialogue=dialogue,
timeout_s=timeout_s,
key_quotes_suffix=key_quotes_suffix,
)
guest_pov: ScenePOVSummary | None = None
if guest_bot_id is not None:
guest_pov = await _summarize_and_apply_for_witness(
conn,
client,
classifier_model=classifier_model,
chat_id=chat_id,
scene_id=scene_id,
bot_id=guest_bot_id,
you_name=you_name,
dialogue=dialogue,
timeout_s=timeout_s,
key_quotes_suffix=key_quotes_suffix,
)
# Group node update: T70 runs a third classifier call to merge the
# two per-POV summaries into a coherent group-level view + a brief
# group-dynamic note. Falls back to the Phase 2 naive concat on
# classifier failure (see :func:`merge_group_summary`). Only fires
# when both POVs ran (i.e. the guest is present) and a group_node
# row exists for this chat.
if guest_pov is not None and get_group_node(conn, chat_id) is not None:
host_bot = get_bot(conn, host_bot_id) or {"name": host_bot_id}
guest_bot = get_bot(conn, guest_bot_id) or {"name": guest_bot_id}
host_name = host_bot.get("name", host_bot_id) or host_bot_id
guest_name = guest_bot.get("name", guest_bot_id) or guest_bot_id
merged = await merge_group_summary(
client,
classifier_model=classifier_model,
host_name=host_name,
host_pov_summary=host_pov.summary,
guest_name=guest_name,
guest_pov_summary=guest_pov.summary,
timeout_s=timeout_s,
)
append_and_apply(
conn,
kind="group_node_updated",
payload={
"chat_id": chat_id,
"summary": merged.summary,
"dynamic": merged.dynamic,
},
)
# T58.2: thread detection on close. Reuses the dialogue we already
# gathered for per-POV summarization — same {speaker, text} shape
# detect_threads expects. Failure-tolerant: classify() returns the
# empty default on retry-exhaustion, and the broad except below
# protects the close pipeline from any other classifier/mock flap.
try:
thread_result = await detect_threads(
client,
classifier_model=classifier_model,
scene_transcript=dialogue,
open_threads=list_open_threads(conn, chat_id),
timeout_s=timeout_s,
)
except Exception:
from chat.services.thread_detection import ThreadDetectionResult
thread_result = ThreadDetectionResult()
for cand in thread_result.candidates:
if cand.action == "open":
new_thread_id = f"thr_{uuid.uuid4().hex[:12]}"
append_and_apply(
conn,
kind="thread_opened",
payload={
"thread_id": new_thread_id,
"chat_id": chat_id,
"title": cand.title,
"summary": cand.summary,
},
)
elif cand.action == "update" and cand.existing_thread_id:
append_and_apply(
conn,
kind="thread_updated",
payload={
"thread_id": cand.existing_thread_id,
"summary": cand.summary,
"last_referenced_scene_id": scene_id,
},
)
elif cand.action == "close" and cand.existing_thread_id:
append_and_apply(
conn,
kind="thread_closed",
payload={
"thread_id": cand.existing_thread_id,
"closed_at": datetime.now(timezone.utc).isoformat(),
},
)
return host_pov
class GroupMetaSummary(BaseModel):
"""Classifier output: a merged group-level view of a closed scene.
Defaults are an empty no-op so callers can use the schema's default
as a sentinel; in practice :func:`merge_group_summary` builds an
explicit naive-concat fallback rather than returning these defaults
directly so existing Phase 2 behavior is preserved on classifier
failure.
"""
summary: str = ""
dynamic: str = ""
_GROUP_MERGE_SYSTEM = (
"Given two per-POV scene summaries from a 3-entity scene (you + "
"host + guest), produce a coherent group-level summary capturing "
"the shared events as both witnesses experienced them, plus a "
"brief 'dynamic' note describing the trio's group dynamic during "
"the scene. Output strict JSON matching schema."
)
async def merge_group_summary(
client: LLMClient,
*,
classifier_model: str,
host_name: str,
host_pov_summary: str,
guest_name: str,
guest_pov_summary: str,
timeout_s: float = 30.0,
) -> GroupMetaSummary:
"""Merge two per-POV scene summaries into a coherent group-level
summary + group-dynamic note. Falls back to the naive concat (the
existing behavior) on classifier failure."""
user = (
f"{host_name} (host) POV summary:\n{host_pov_summary}\n\n"
f"{guest_name} (guest) POV summary:\n{guest_pov_summary}"
)
fallback = GroupMetaSummary(
summary=(
f"{host_name}: {host_pov_summary}\n\n"
f"{guest_name}: {guest_pov_summary}"
),
dynamic="",
)
return await classify(
client,
model=classifier_model,
system=_GROUP_MERGE_SYSTEM,
user=user,
schema=GroupMetaSummary,
default=fallback,
timeout_s=timeout_s,
)