Files
chat/chat/state/edges.py
T
2026-04-26 11:51:15 -04:00

85 lines
2.8 KiB
Python

from __future__ import annotations
import json
from sqlite3 import Connection
from chat.eventlog.projector import on
from chat.eventlog.log import Event
def _clamp(value: int, lo: int = 0, hi: int = 100) -> int:
return max(lo, min(hi, value))
@on("edge_update")
def _apply_edge_update(conn: Connection, e: Event) -> None:
p = e.payload
source_id = p["source_id"]
target_id = p["target_id"]
chat_id = p.get("chat_id")
# Upsert: ensure a row exists with defaults, then apply deltas.
conn.execute(
"INSERT OR IGNORE INTO edges (chat_id, source_id, target_id) VALUES (?, ?, ?)",
(chat_id, source_id, target_id),
)
row = conn.execute(
"SELECT affinity, trust, knowledge_json, last_interaction_chat_id, last_interaction_at "
"FROM edges WHERE source_id = ? AND target_id = ?",
(source_id, target_id),
).fetchone()
affinity, trust, knowledge_json, last_chat_id, last_at = row
affinity_delta = int(p.get("affinity_delta", 0))
trust_delta = int(p.get("trust_delta", 0))
new_affinity = _clamp(affinity + affinity_delta)
new_trust = _clamp(trust + trust_delta)
new_facts = p.get("knowledge_facts") or []
if new_facts:
knowledge = json.loads(knowledge_json)
knowledge.extend(new_facts)
knowledge_json = json.dumps(knowledge)
payload_at = p.get("last_interaction_at")
payload_chat_id = p.get("last_interaction_chat_id")
if payload_at is not None:
last_at = payload_at
if payload_chat_id is not None:
last_chat_id = payload_chat_id
conn.execute(
"UPDATE edges SET affinity = ?, trust = ?, knowledge_json = ?, "
"last_interaction_chat_id = ?, last_interaction_at = ? "
"WHERE source_id = ? AND target_id = ?",
(new_affinity, new_trust, knowledge_json, last_chat_id, last_at,
source_id, target_id),
)
def get_edge(conn: Connection, source_id: str, target_id: str) -> dict | None:
row = conn.execute(
"SELECT * FROM edges WHERE source_id = ? AND target_id = ?",
(source_id, target_id),
).fetchone()
if not row:
return None
cols = [c[1] for c in conn.execute("PRAGMA table_info(edges)").fetchall()]
d = dict(zip(cols, row))
d["knowledge"] = json.loads(d.pop("knowledge_json"))
return d
def list_edges_for(conn: Connection, source_id: str) -> list[dict]:
cur = conn.execute(
"SELECT * FROM edges WHERE source_id = ? ORDER BY target_id",
(source_id,),
)
rows = cur.fetchall()
cols = [c[1] for c in conn.execute("PRAGMA table_info(edges)").fetchall()]
out: list[dict] = []
for row in rows:
d = dict(zip(cols, row))
d["knowledge"] = json.loads(d.pop("knowledge_json"))
out.append(d)
return out