128 lines
4.1 KiB
Python
128 lines
4.1 KiB
Python
from __future__ import annotations
|
|
import json
|
|
from sqlite3 import Connection
|
|
|
|
from chat.eventlog.projector import on
|
|
from chat.eventlog.log import Event
|
|
|
|
|
|
_TERMINAL_STATUSES = {"completed", "cancelled", "expired"}
|
|
|
|
|
|
@on("event_planned")
|
|
def _apply_event_planned(conn: Connection, e: Event) -> None:
|
|
p = e.payload
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO events "
|
|
"(event_id, chat_id, kind, status, props_json, planned_for) "
|
|
"VALUES (?, ?, ?, 'planned', ?, ?)",
|
|
(
|
|
p["event_id"],
|
|
p["chat_id"],
|
|
p["kind"],
|
|
json.dumps(p.get("props", {})),
|
|
p.get("planned_for"),
|
|
),
|
|
)
|
|
|
|
|
|
@on("event_started")
|
|
def _apply_event_started(conn: Connection, e: Event) -> None:
|
|
p = e.payload
|
|
# Idempotent: only transition from non-terminal status.
|
|
conn.execute(
|
|
"UPDATE events SET status = 'active', started_at = ?, updated_at = datetime('now') "
|
|
"WHERE event_id = ? AND status NOT IN ('completed','cancelled','expired')",
|
|
(p.get("started_at"), p["event_id"]),
|
|
)
|
|
|
|
|
|
@on("event_completed")
|
|
def _apply_event_completed(conn: Connection, e: Event) -> None:
|
|
p = e.payload
|
|
conn.execute(
|
|
"UPDATE events SET status = 'completed', completed_at = ?, updated_at = datetime('now') "
|
|
"WHERE event_id = ? AND status NOT IN ('completed','cancelled','expired')",
|
|
(p.get("completed_at"), p["event_id"]),
|
|
)
|
|
|
|
|
|
@on("event_cancelled")
|
|
def _apply_event_cancelled(conn: Connection, e: Event) -> None:
|
|
p = e.payload
|
|
conn.execute(
|
|
"UPDATE events SET status = 'cancelled', completed_at = ?, updated_at = datetime('now') "
|
|
"WHERE event_id = ? AND status NOT IN ('completed','cancelled','expired')",
|
|
(p.get("completed_at"), p["event_id"]),
|
|
)
|
|
|
|
|
|
@on("event_expired")
|
|
def _apply_event_expired(conn: Connection, e: Event) -> None:
|
|
p = e.payload
|
|
conn.execute(
|
|
"UPDATE events SET status = 'expired', completed_at = ?, updated_at = datetime('now') "
|
|
"WHERE event_id = ? AND status NOT IN ('completed','cancelled','expired')",
|
|
(p.get("completed_at"), p["event_id"]),
|
|
)
|
|
|
|
|
|
def get_event(conn: Connection, event_id: str) -> dict | None:
|
|
row = conn.execute(
|
|
"SELECT event_id, chat_id, kind, status, props_json, planned_for, "
|
|
"started_at, completed_at, created_at, updated_at "
|
|
"FROM events WHERE event_id = ?",
|
|
(event_id,),
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"event_id": row[0],
|
|
"chat_id": row[1],
|
|
"kind": row[2],
|
|
"status": row[3],
|
|
"props": json.loads(row[4]),
|
|
"planned_for": row[5],
|
|
"started_at": row[6],
|
|
"completed_at": row[7],
|
|
"created_at": row[8],
|
|
"updated_at": row[9],
|
|
}
|
|
|
|
|
|
def list_active_events(conn: Connection, chat_id: str) -> list[dict]:
|
|
rows = conn.execute(
|
|
"SELECT event_id, chat_id, kind, status, props_json, planned_for, "
|
|
"started_at, completed_at, created_at, updated_at "
|
|
"FROM events WHERE chat_id = ? AND status IN ('planned','active') "
|
|
"ORDER BY id ASC",
|
|
(chat_id,),
|
|
).fetchall()
|
|
return [
|
|
{
|
|
"event_id": r[0], "chat_id": r[1], "kind": r[2], "status": r[3],
|
|
"props": json.loads(r[4]),
|
|
"planned_for": r[5], "started_at": r[6], "completed_at": r[7],
|
|
"created_at": r[8], "updated_at": r[9],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def list_events_in_status(conn: Connection, chat_id: str, status: str) -> list[dict]:
|
|
rows = conn.execute(
|
|
"SELECT event_id, chat_id, kind, status, props_json, planned_for, "
|
|
"started_at, completed_at, created_at, updated_at "
|
|
"FROM events WHERE chat_id = ? AND status = ? ORDER BY id ASC",
|
|
(chat_id, status),
|
|
).fetchall()
|
|
return [
|
|
{
|
|
"event_id": r[0], "chat_id": r[1], "kind": r[2], "status": r[3],
|
|
"props": json.loads(r[4]),
|
|
"planned_for": r[5], "started_at": r[6], "completed_at": r[7],
|
|
"created_at": r[8], "updated_at": r[9],
|
|
}
|
|
for r in rows
|
|
]
|