diff --git a/chat/services/event_promotion.py b/chat/services/event_promotion.py new file mode 100644 index 0000000..b1c8c3a --- /dev/null +++ b/chat/services/event_promotion.py @@ -0,0 +1,149 @@ +"""Event-completion promotion (T56). + +When an event reaches ``status='completed'``, read its ``props_json`` +and emit promotion events into the appropriate state stores. +Synchronous, no LLM. Skips when the event status is not ``completed`` +(cancelled / expired terminate the event without promoting). + +Props recognized: + +- ``acquired_objects: list[str]`` — emits a ``manual_edit`` with + ``target_kind="memory_pov_summary"`` per object on the host's memory + row, recording the acquisition. Phase 3 is a stub: it requires both + ``host_bot_id`` and ``host_memory_id`` (an existing memories.id) to + be present in props; missing either skips that object cleanly. + Phase 4 will introduce a real inventory schema. + +- ``knowledge_facts: list[{owner_id, target_id, fact}]`` — emits an + ``edge_update`` event on the directed ``owner_id -> target_id`` edge + with the fact appended to ``knowledge_facts``. The ``edge_update`` + projector accepts ``knowledge_facts`` as a list and extends the + edge's stored ``knowledge_json``. + +- ``relationship_change: {summary, source_id, target_id}`` — emits a + ``manual_edit`` with ``target_kind="edge_summary"`` overwriting the + edge's ``summary`` field on the directed pair. + +Anything else stays in the closed event record (the projector kept +the row; no further promotion). +""" + +from __future__ import annotations + +from sqlite3 import Connection + +from chat.eventlog.log import append_and_apply +from chat.state.events import get_event + + +def promote_completed_event( + conn: Connection, + *, + event_id: str, + chat_id: str, + chat_clock_at: str | None, +) -> dict: + """Read the completed event's props and emit promotion events. + + Returns a dict of counts keyed by promoted artifact: + ``{"acquired_objects", "knowledge_facts", "relationship_change"}``. + Skips silently if the event row is missing or its status is not + ``completed`` — cancelled / expired events terminate without any + promotion. + """ + counts = { + "acquired_objects": 0, + "knowledge_facts": 0, + "relationship_change": 0, + } + + event = get_event(conn, event_id) + if event is None or event["status"] != "completed": + return counts + + props = event.get("props") or {} + + # acquired_objects: each becomes a memory_pov_summary edit (Phase 3 + # stub). The manual_edit projector requires a valid memory rowid as + # ``target_id`` (it does ``int(target_id)``), so skip cleanly when + # neither a host_bot_id nor a host_memory_id is supplied. + host_bot_id = props.get("host_bot_id") + host_memory_id = props.get("host_memory_id") + for obj in props.get("acquired_objects", []) or []: + if host_bot_id is None or host_memory_id is None: + continue + append_and_apply( + conn, + kind="manual_edit", + payload={ + "target_kind": "memory_pov_summary", + "target_id": host_memory_id, + "owner_id": host_bot_id, + "chat_id": chat_id, + "prior_value": "", + "new_value": f"Acquired: {obj}", + "source": "event_promotion", + "event_id": event_id, + "chat_clock_at": chat_clock_at, + }, + ) + counts["acquired_objects"] += 1 + + # knowledge_facts: each becomes an edge_update appending the fact. + for fact_entry in props.get("knowledge_facts", []) or []: + owner_id = fact_entry.get("owner_id") + target_id = fact_entry.get("target_id") + fact = fact_entry.get("fact", "") + if not owner_id or not target_id or not fact: + continue + append_and_apply( + conn, + kind="edge_update", + payload={ + "source_id": owner_id, + "target_id": target_id, + "chat_id": chat_id, + "affinity_delta": 0, + "trust_delta": 0, + "knowledge_facts": [fact], + "last_interaction_at": chat_clock_at, + "last_interaction_chat_id": chat_id, + "source": "event_promotion", + "event_id": event_id, + }, + ) + counts["knowledge_facts"] += 1 + + # relationship_change: edge_summary manual_edit on the directed pair. + # The manual_edit projector for ``edge_summary`` keys on a + # ``target_id`` dict ``{source_id, target_id}`` (see + # chat/state/manual_edit.py); we shape the payload to match. + rc = props.get("relationship_change") or {} + if rc: + source_id = rc.get("source_id") + rc_target_id = rc.get("target_id") + summary = rc.get("summary", "") + if source_id and rc_target_id and summary: + append_and_apply( + conn, + kind="manual_edit", + payload={ + "target_kind": "edge_summary", + "target_id": { + "source_id": source_id, + "target_id": rc_target_id, + }, + "chat_id": chat_id, + "prior_value": "", + "new_value": summary, + "source": "event_promotion", + "event_id": event_id, + "chat_clock_at": chat_clock_at, + }, + ) + counts["relationship_change"] += 1 + + return counts + + +__all__ = ["promote_completed_event"] diff --git a/tests/test_event_promotion.py b/tests/test_event_promotion.py new file mode 100644 index 0000000..c812156 --- /dev/null +++ b/tests/test_event_promotion.py @@ -0,0 +1,256 @@ +"""Tests for the event-completion promotion service (T56). + +When an event reaches ``status='completed'``, the orchestrator promotes +structured artifacts the event carried (``acquired_objects``, +``knowledge_facts``, ``relationship_change``) into the appropriate +state stores via downstream events. Cancelled / expired events do NOT +promote — the closed event row is left in place but no follow-on +events fire. +""" + +from __future__ import annotations + +import json + +from chat.db.connection import open_db +from chat.db.migrate import apply_migrations +from chat.eventlog.log import append_event +from chat.eventlog.projector import project +from chat.services.event_promotion import promote_completed_event +from chat.state.edges import get_edge +import chat.state.edges # noqa: F401 - register edge_update handler +import chat.state.entities # noqa: F401 - register handlers +import chat.state.events # noqa: F401 - register events handlers +import chat.state.manual_edit # noqa: F401 - register manual_edit handler +import chat.state.world # noqa: F401 - register handlers + + +def _bot_payload(bot_id: str, name: str) -> dict: + return { + "id": bot_id, + "name": name, + "persona": "thoughtful, observant", + "voice_samples": [], + "traits": [], + "backstory": "", + "initial_relationship_to_you": "coworker", + "kickoff_prose": "", + } + + +def _chat_payload(chat_id: str = "chat_bot_a") -> dict: + return { + "id": chat_id, + "host_bot_id": "bot_a", + "guest_bot_id": "bot_b", + "initial_time": "2026-04-26T20:00:00+00:00", + "narrative_anchor": "Day 1 evening", + "weather": "clear", + } + + +def _seed_chat(conn) -> None: + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA")) + append_event(conn, kind="bot_authored", payload=_bot_payload("bot_b", "BotB")) + append_event(conn, kind="chat_created", payload=_chat_payload()) + + +def _seed_event( + conn, + *, + event_id: str, + props: dict, + terminal_kind: str = "event_completed", +) -> None: + """Append event_planned, then a terminal transition (default completed).""" + append_event( + conn, + kind="event_planned", + payload={ + "event_id": event_id, + "chat_id": "chat_bot_a", + "kind": "story_event", + "props": props, + "planned_for": "2026-04-30T18:00:00+00:00", + }, + ) + append_event( + conn, + kind=terminal_kind, + payload={ + "event_id": event_id, + "completed_at": "2026-04-30T20:00:00+00:00", + }, + ) + project(conn) + + +def _max_event_id(conn) -> int: + return conn.execute("SELECT COALESCE(MAX(id), 0) FROM event_log").fetchone()[0] + + +def _events_after(conn, after_id: int, kind: str) -> list[dict]: + rows = conn.execute( + "SELECT id, kind, payload_json FROM event_log " + "WHERE id > ? AND kind = ? ORDER BY id ASC", + (after_id, kind), + ).fetchall() + return [ + {"id": r[0], "kind": r[1], "payload": json.loads(r[2])} for r in rows + ] + + +def test_empty_props_no_op(tmp_path): + """Completed event with empty props produces no promotion events.""" + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_chat(conn) + _seed_event(conn, event_id="evt_empty", props={}) + + before = _max_event_id(conn) + counts = promote_completed_event( + conn, + event_id="evt_empty", + chat_id="chat_bot_a", + chat_clock_at="2026-04-30T20:00:00+00:00", + ) + + assert counts == { + "acquired_objects": 0, + "knowledge_facts": 0, + "relationship_change": 0, + } + # No new edge_update or manual_edit rows after the promote call. + assert _events_after(conn, before, "edge_update") == [] + assert _events_after(conn, before, "manual_edit") == [] + + +def test_knowledge_facts_emits_edge_update(tmp_path): + """A knowledge_facts entry promotes to an edge_update on the directed edge.""" + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_chat(conn) + _seed_event( + conn, + event_id="evt_kf", + props={ + "knowledge_facts": [ + { + "owner_id": "bot_a", + "target_id": "you", + "fact": "Maya prefers tea over coffee", + } + ] + }, + ) + + before = _max_event_id(conn) + counts = promote_completed_event( + conn, + event_id="evt_kf", + chat_id="chat_bot_a", + chat_clock_at="2026-04-30T20:00:00+00:00", + ) + + assert counts["knowledge_facts"] == 1 + assert counts["acquired_objects"] == 0 + assert counts["relationship_change"] == 0 + + # An edge_update event landed in the event_log AFTER the promote call. + new_edge_updates = _events_after(conn, before, "edge_update") + assert len(new_edge_updates) == 1 + payload = new_edge_updates[0]["payload"] + assert payload["source_id"] == "bot_a" + assert payload["target_id"] == "you" + assert payload["knowledge_facts"] == ["Maya prefers tea over coffee"] + + # And the projected edge has the fact applied. + edge = get_edge(conn, "bot_a", "you") + assert edge is not None + assert "Maya prefers tea over coffee" in edge["knowledge"] + + +def test_relationship_change_emits_manual_edit(tmp_path): + """A relationship_change promotes to a manual_edit edge_summary.""" + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_chat(conn) + _seed_event( + conn, + event_id="evt_rc", + props={ + "relationship_change": { + "source_id": "bot_a", + "target_id": "you", + "summary": "they're now dating", + } + }, + ) + + before = _max_event_id(conn) + counts = promote_completed_event( + conn, + event_id="evt_rc", + chat_id="chat_bot_a", + chat_clock_at="2026-04-30T20:00:00+00:00", + ) + + assert counts["relationship_change"] == 1 + assert counts["knowledge_facts"] == 0 + assert counts["acquired_objects"] == 0 + + new_manual_edits = _events_after(conn, before, "manual_edit") + # Filter to edge_summary only — Phase 3 stub may also emit + # memory_pov_summary entries for acquired_objects, but here there + # are none. + edge_summary_edits = [ + m for m in new_manual_edits + if m["payload"].get("target_kind") == "edge_summary" + ] + assert len(edge_summary_edits) == 1 + payload = edge_summary_edits[0]["payload"] + assert payload["target_kind"] == "edge_summary" + assert payload["target_id"] == {"source_id": "bot_a", "target_id": "you"} + assert payload["new_value"] == "they're now dating" + + +def test_cancelled_event_does_not_promote(tmp_path): + """Cancelled events have promotable props ignored — no follow-on events.""" + db = tmp_path / "t.db" + apply_migrations(db) + with open_db(db) as conn: + _seed_chat(conn) + _seed_event( + conn, + event_id="evt_canx", + props={ + "knowledge_facts": [ + {"owner_id": "bot_a", "target_id": "you", "fact": "x"} + ], + "relationship_change": { + "source_id": "bot_a", + "target_id": "you", + "summary": "ignored", + }, + }, + terminal_kind="event_cancelled", + ) + + before = _max_event_id(conn) + counts = promote_completed_event( + conn, + event_id="evt_canx", + chat_id="chat_bot_a", + chat_clock_at="2026-04-30T20:00:00+00:00", + ) + + assert counts == { + "acquired_objects": 0, + "knowledge_facts": 0, + "relationship_change": 0, + } + assert _events_after(conn, before, "edge_update") == [] + assert _events_after(conn, before, "manual_edit") == []