Files
chat/chat/state/world.py
T

203 lines
5.9 KiB
Python

from __future__ import annotations
import json
from sqlite3 import Connection
from chat.eventlog.projector import on
from chat.eventlog.log import Event
def _row_to_dict(conn: Connection, table: str, row: tuple) -> dict:
cols = [c[1] for c in conn.execute(f"PRAGMA table_info({table})").fetchall()]
return dict(zip(cols, row))
@on("chat_created")
def _apply_chat_created(conn: Connection, e: Event) -> None:
p = e.payload
conn.execute(
"INSERT INTO chats (id, host_bot_id, guest_bot_id) VALUES (?, ?, ?)",
(p["id"], p["host_bot_id"], p.get("guest_bot_id")),
)
conn.execute(
"INSERT INTO chat_state (chat_id, time, weather, active_scene_id, narrative_anchor) "
"VALUES (?, ?, ?, NULL, ?)",
(
p["id"],
p["initial_time"],
p.get("weather", ""),
p.get("narrative_anchor", ""),
),
)
@on("container_created")
def _apply_container_created(conn: Connection, e: Event) -> None:
p = e.payload
conn.execute(
"INSERT INTO containers (chat_id, name, type, properties_json, parent_id) "
"VALUES (?, ?, ?, ?, ?)",
(
p["chat_id"],
p["name"],
p["type"],
json.dumps(p.get("properties", {})),
p.get("parent_id"),
),
)
@on("activity_change")
def _apply_activity_change(conn: Connection, e: Event) -> None:
p = e.payload
conn.execute(
"INSERT OR REPLACE INTO activity ("
"entity_id, container_id, slot, posture, action_json, "
"attention, holding_json, status_json, updated_at"
") VALUES (?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))",
(
p["entity_id"],
p.get("container_id"),
p.get("slot"),
p.get("posture", ""),
json.dumps(p.get("action", {})),
p.get("attention", ""),
json.dumps(p.get("holding", [])),
json.dumps(p.get("status", {})),
),
)
@on("scene_opened")
def _apply_scene_opened(conn: Connection, e: Event) -> None:
p = e.payload
cur = conn.execute(
"INSERT INTO scenes (chat_id, container_id, started_at, ended_at, "
"significance, participants_json) VALUES (?, ?, ?, NULL, 0, ?)",
(
p["chat_id"],
p.get("container_id"),
p["started_at"],
json.dumps(p.get("participants", [])),
),
)
new_id = cur.lastrowid
conn.execute(
"UPDATE chat_state SET active_scene_id = ? WHERE chat_id = ?",
(new_id, p["chat_id"]),
)
@on("scene_closed")
def _apply_scene_closed(conn: Connection, e: Event) -> None:
p = e.payload
scene_id = p["scene_id"]
significance = int(p.get("significance", 0))
conn.execute(
"UPDATE scenes SET ended_at = ?, significance = ? WHERE id = ?",
(p["ended_at"], significance, scene_id),
)
row = conn.execute(
"SELECT chat_id FROM scenes WHERE id = ?", (scene_id,)
).fetchone()
if row is not None:
chat_id = row[0]
conn.execute(
"UPDATE chat_state SET active_scene_id = NULL WHERE chat_id = ?",
(chat_id,),
)
def _chat_select_columns() -> str:
return (
"c.id, c.host_bot_id, c.guest_bot_id, c.created_at, "
"s.time, s.weather, s.active_scene_id, s.narrative_anchor"
)
def _chat_row_to_dict(row: tuple) -> dict:
return {
"id": row[0],
"host_bot_id": row[1],
"guest_bot_id": row[2],
"created_at": row[3],
"time": row[4],
"weather": row[5],
"active_scene_id": row[6],
"narrative_anchor": row[7],
}
def get_chat(conn: Connection, chat_id: str) -> dict | None:
row = conn.execute(
f"SELECT {_chat_select_columns()} FROM chats c "
"JOIN chat_state s ON s.chat_id = c.id WHERE c.id = ?",
(chat_id,),
).fetchone()
if not row:
return None
return _chat_row_to_dict(row)
def list_chats(conn: Connection) -> list[dict]:
cur = conn.execute(
f"SELECT {_chat_select_columns()} FROM chats c "
"JOIN chat_state s ON s.chat_id = c.id ORDER BY c.id"
)
return [_chat_row_to_dict(row) for row in cur.fetchall()]
def get_container(conn: Connection, container_id: int) -> dict | None:
row = conn.execute(
"SELECT * FROM containers WHERE id = ?", (container_id,)
).fetchone()
if not row:
return None
d = _row_to_dict(conn, "containers", row)
d["properties"] = json.loads(d.pop("properties_json"))
return d
def find_container(conn: Connection, chat_id: str, name: str) -> dict | None:
row = conn.execute(
"SELECT * FROM containers WHERE chat_id = ? AND name = ?",
(chat_id, name),
).fetchone()
if not row:
return None
d = _row_to_dict(conn, "containers", row)
d["properties"] = json.loads(d.pop("properties_json"))
return d
def get_activity(conn: Connection, entity_id: str) -> dict | None:
row = conn.execute(
"SELECT * FROM activity WHERE entity_id = ?", (entity_id,)
).fetchone()
if not row:
return None
d = _row_to_dict(conn, "activity", row)
d["action"] = json.loads(d.pop("action_json"))
d["holding"] = json.loads(d.pop("holding_json"))
d["status"] = json.loads(d.pop("status_json"))
return d
def get_scene(conn: Connection, scene_id: int) -> dict | None:
row = conn.execute(
"SELECT * FROM scenes WHERE id = ?", (scene_id,)
).fetchone()
if not row:
return None
d = _row_to_dict(conn, "scenes", row)
d["participants"] = json.loads(d.pop("participants_json"))
return d
def active_scene(conn: Connection, chat_id: str) -> dict | None:
row = conn.execute(
"SELECT active_scene_id FROM chat_state WHERE chat_id = ?",
(chat_id,),
).fetchone()
if not row or row[0] is None:
return None
return get_scene(conn, row[0])