85 lines
2.8 KiB
Python
85 lines
2.8 KiB
Python
from __future__ import annotations
|
|
import json
|
|
from sqlite3 import Connection
|
|
from chat.eventlog.projector import on
|
|
from chat.eventlog.log import Event
|
|
|
|
|
|
def _clamp(value: int, lo: int = 0, hi: int = 100) -> int:
|
|
return max(lo, min(hi, value))
|
|
|
|
|
|
@on("edge_update")
|
|
def _apply_edge_update(conn: Connection, e: Event) -> None:
|
|
p = e.payload
|
|
source_id = p["source_id"]
|
|
target_id = p["target_id"]
|
|
chat_id = p.get("chat_id")
|
|
|
|
# Upsert: ensure a row exists with defaults, then apply deltas.
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO edges (chat_id, source_id, target_id) VALUES (?, ?, ?)",
|
|
(chat_id, source_id, target_id),
|
|
)
|
|
|
|
row = conn.execute(
|
|
"SELECT affinity, trust, knowledge_json, last_interaction_chat_id, last_interaction_at "
|
|
"FROM edges WHERE source_id = ? AND target_id = ?",
|
|
(source_id, target_id),
|
|
).fetchone()
|
|
affinity, trust, knowledge_json, last_chat_id, last_at = row
|
|
|
|
affinity_delta = int(p.get("affinity_delta", 0))
|
|
trust_delta = int(p.get("trust_delta", 0))
|
|
new_affinity = _clamp(affinity + affinity_delta)
|
|
new_trust = _clamp(trust + trust_delta)
|
|
|
|
new_facts = p.get("knowledge_facts") or []
|
|
if new_facts:
|
|
knowledge = json.loads(knowledge_json)
|
|
knowledge.extend(new_facts)
|
|
knowledge_json = json.dumps(knowledge)
|
|
|
|
payload_at = p.get("last_interaction_at")
|
|
payload_chat_id = p.get("last_interaction_chat_id")
|
|
if payload_at is not None:
|
|
last_at = payload_at
|
|
if payload_chat_id is not None:
|
|
last_chat_id = payload_chat_id
|
|
|
|
conn.execute(
|
|
"UPDATE edges SET affinity = ?, trust = ?, knowledge_json = ?, "
|
|
"last_interaction_chat_id = ?, last_interaction_at = ? "
|
|
"WHERE source_id = ? AND target_id = ?",
|
|
(new_affinity, new_trust, knowledge_json, last_chat_id, last_at,
|
|
source_id, target_id),
|
|
)
|
|
|
|
|
|
def get_edge(conn: Connection, source_id: str, target_id: str) -> dict | None:
|
|
row = conn.execute(
|
|
"SELECT * FROM edges WHERE source_id = ? AND target_id = ?",
|
|
(source_id, target_id),
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
cols = [c[1] for c in conn.execute("PRAGMA table_info(edges)").fetchall()]
|
|
d = dict(zip(cols, row))
|
|
d["knowledge"] = json.loads(d.pop("knowledge_json"))
|
|
return d
|
|
|
|
|
|
def list_edges_for(conn: Connection, source_id: str) -> list[dict]:
|
|
cur = conn.execute(
|
|
"SELECT * FROM edges WHERE source_id = ? ORDER BY target_id",
|
|
(source_id,),
|
|
)
|
|
rows = cur.fetchall()
|
|
cols = [c[1] for c in conn.execute("PRAGMA table_info(edges)").fetchall()]
|
|
out: list[dict] = []
|
|
for row in rows:
|
|
d = dict(zip(cols, row))
|
|
d["knowledge"] = json.loads(d.pop("knowledge_json"))
|
|
out.append(d)
|
|
return out
|