Files
chat/tests/test_events_state.py
2026-04-26 20:04:36 -04:00

236 lines
7.2 KiB
Python

from __future__ import annotations
from chat.db.connection import open_db
from chat.db.migrate import apply_migrations
from chat.eventlog.log import append_and_apply, append_event
from chat.eventlog.projector import project
import chat.state.entities # registers handlers
import chat.state.world # registers handlers
import chat.state.group_node # registers handlers
import chat.state.events # registers handlers
from chat.state.events import (
get_event,
list_active_events,
list_events_in_status,
)
def _bot_payload(bot_id: str, name: str) -> dict:
return {
"id": bot_id,
"name": name,
"persona": "thoughtful, observant",
"voice_samples": [],
"traits": [],
"backstory": "",
"initial_relationship_to_you": "coworker",
"kickoff_prose": "",
}
def _chat_payload(chat_id: str = "chat_bot_a") -> dict:
return {
"id": chat_id,
"host_bot_id": "bot_a",
"guest_bot_id": "bot_b",
"initial_time": "2026-04-26T20:00:00+00:00",
"narrative_anchor": "Day 1 evening",
"weather": "clear",
}
def _seed_chat(conn) -> None:
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_a", "BotA"))
append_event(conn, kind="bot_authored", payload=_bot_payload("bot_b", "BotB"))
append_event(conn, kind="chat_created", payload=_chat_payload())
def test_event_planned_creates_row(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_chat(conn)
append_event(
conn,
kind="event_planned",
payload={
"event_id": "evt_abc",
"chat_id": "chat_bot_a",
"kind": "date_at_park",
"props": {"location": "park"},
"planned_for": "2026-04-30T18:00:00+00:00",
},
)
project(conn)
ev = get_event(conn, "evt_abc")
assert ev is not None
assert ev["event_id"] == "evt_abc"
assert ev["chat_id"] == "chat_bot_a"
assert ev["kind"] == "date_at_park"
assert ev["status"] == "planned"
assert ev["props"]["location"] == "park"
assert ev["planned_for"] == "2026-04-30T18:00:00+00:00"
assert ev["started_at"] is None
assert ev["completed_at"] is None
def test_event_started_then_completed_updates_status(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_chat(conn)
append_event(
conn,
kind="event_planned",
payload={
"event_id": "evt_abc",
"chat_id": "chat_bot_a",
"kind": "date_at_park",
"props": {},
"planned_for": "2026-04-30T18:00:00+00:00",
},
)
append_event(
conn,
kind="event_started",
payload={
"event_id": "evt_abc",
"started_at": "2026-04-30T18:01:00+00:00",
},
)
append_event(
conn,
kind="event_completed",
payload={
"event_id": "evt_abc",
"completed_at": "2026-04-30T20:00:00+00:00",
},
)
project(conn)
ev = get_event(conn, "evt_abc")
assert ev is not None
assert ev["status"] == "completed"
assert ev["started_at"] == "2026-04-30T18:01:00+00:00"
assert ev["completed_at"] == "2026-04-30T20:00:00+00:00"
def test_event_cancelled_terminal_subsequent_transitions_ignored(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_chat(conn)
append_event(
conn,
kind="event_planned",
payload={
"event_id": "evt_abc",
"chat_id": "chat_bot_a",
"kind": "date_at_park",
"props": {},
"planned_for": "2026-04-30T18:00:00+00:00",
},
)
append_event(
conn,
kind="event_cancelled",
payload={
"event_id": "evt_abc",
"completed_at": "2026-04-30T17:00:00+00:00",
},
)
project(conn)
ev = get_event(conn, "evt_abc")
assert ev is not None
assert ev["status"] == "cancelled"
assert ev["completed_at"] == "2026-04-30T17:00:00+00:00"
# Subsequent event_started must be no-oped because status is terminal.
# Use append_and_apply so we apply ONLY this new event without
# replaying earlier non-idempotent handlers (e.g. chat_created).
append_and_apply(
conn,
kind="event_started",
payload={
"event_id": "evt_abc",
"started_at": "2026-04-30T18:01:00+00:00",
},
)
ev2 = get_event(conn, "evt_abc")
assert ev2 is not None
assert ev2["status"] == "cancelled"
assert ev2["started_at"] is None
# completed_at unchanged from the cancelled transition
assert ev2["completed_at"] == "2026-04-30T17:00:00+00:00"
def test_list_active_events_filters_to_planned_and_active(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_chat(conn)
# Four events: one planned, one active, one completed, one cancelled.
for ev_id, kind in [
("evt_planned", "date_at_park"),
("evt_active", "movie_night"),
("evt_done", "dinner"),
("evt_canx", "trip"),
]:
append_event(
conn,
kind="event_planned",
payload={
"event_id": ev_id,
"chat_id": "chat_bot_a",
"kind": kind,
"props": {},
"planned_for": "2026-04-30T18:00:00+00:00",
},
)
append_event(
conn,
kind="event_started",
payload={
"event_id": "evt_active",
"started_at": "2026-04-30T18:01:00+00:00",
},
)
append_event(
conn,
kind="event_started",
payload={
"event_id": "evt_done",
"started_at": "2026-04-30T18:01:00+00:00",
},
)
append_event(
conn,
kind="event_completed",
payload={
"event_id": "evt_done",
"completed_at": "2026-04-30T20:00:00+00:00",
},
)
append_event(
conn,
kind="event_cancelled",
payload={
"event_id": "evt_canx",
"completed_at": "2026-04-30T17:00:00+00:00",
},
)
project(conn)
active = list_active_events(conn, "chat_bot_a")
active_ids = {e["event_id"] for e in active}
assert active_ids == {"evt_planned", "evt_active"}
completed = list_events_in_status(conn, "chat_bot_a", "completed")
assert [e["event_id"] for e in completed] == ["evt_done"]
cancelled = list_events_in_status(conn, "chat_bot_a", "cancelled")
assert [e["event_id"] for e in cancelled] == ["evt_canx"]