feat: directed edges with per-turn delta projector

This commit is contained in:
Joseph Doherty
2026-04-26 11:51:15 -04:00
parent 7e6c2985dd
commit bc97d425ef
3 changed files with 259 additions and 0 deletions
+13
View File
@@ -0,0 +1,13 @@
CREATE TABLE edges (
id INTEGER PRIMARY KEY,
chat_id TEXT,
source_id TEXT NOT NULL,
target_id TEXT NOT NULL,
affinity INTEGER NOT NULL DEFAULT 50,
trust INTEGER NOT NULL DEFAULT 50,
summary TEXT NOT NULL DEFAULT '',
knowledge_json TEXT NOT NULL DEFAULT '[]',
last_interaction_chat_id TEXT,
last_interaction_at TEXT,
UNIQUE (source_id, target_id)
);
+84
View File
@@ -0,0 +1,84 @@
from __future__ import annotations
import json
from sqlite3 import Connection
from chat.eventlog.projector import on
from chat.eventlog.log import Event
def _clamp(value: int, lo: int = 0, hi: int = 100) -> int:
return max(lo, min(hi, value))
@on("edge_update")
def _apply_edge_update(conn: Connection, e: Event) -> None:
p = e.payload
source_id = p["source_id"]
target_id = p["target_id"]
chat_id = p.get("chat_id")
# Upsert: ensure a row exists with defaults, then apply deltas.
conn.execute(
"INSERT OR IGNORE INTO edges (chat_id, source_id, target_id) VALUES (?, ?, ?)",
(chat_id, source_id, target_id),
)
row = conn.execute(
"SELECT affinity, trust, knowledge_json, last_interaction_chat_id, last_interaction_at "
"FROM edges WHERE source_id = ? AND target_id = ?",
(source_id, target_id),
).fetchone()
affinity, trust, knowledge_json, last_chat_id, last_at = row
affinity_delta = int(p.get("affinity_delta", 0))
trust_delta = int(p.get("trust_delta", 0))
new_affinity = _clamp(affinity + affinity_delta)
new_trust = _clamp(trust + trust_delta)
new_facts = p.get("knowledge_facts") or []
if new_facts:
knowledge = json.loads(knowledge_json)
knowledge.extend(new_facts)
knowledge_json = json.dumps(knowledge)
payload_at = p.get("last_interaction_at")
payload_chat_id = p.get("last_interaction_chat_id")
if payload_at is not None:
last_at = payload_at
if payload_chat_id is not None:
last_chat_id = payload_chat_id
conn.execute(
"UPDATE edges SET affinity = ?, trust = ?, knowledge_json = ?, "
"last_interaction_chat_id = ?, last_interaction_at = ? "
"WHERE source_id = ? AND target_id = ?",
(new_affinity, new_trust, knowledge_json, last_chat_id, last_at,
source_id, target_id),
)
def get_edge(conn: Connection, source_id: str, target_id: str) -> dict | None:
row = conn.execute(
"SELECT * FROM edges WHERE source_id = ? AND target_id = ?",
(source_id, target_id),
).fetchone()
if not row:
return None
cols = [c[1] for c in conn.execute("PRAGMA table_info(edges)").fetchall()]
d = dict(zip(cols, row))
d["knowledge"] = json.loads(d.pop("knowledge_json"))
return d
def list_edges_for(conn: Connection, source_id: str) -> list[dict]:
cur = conn.execute(
"SELECT * FROM edges WHERE source_id = ? ORDER BY target_id",
(source_id,),
)
rows = cur.fetchall()
cols = [c[1] for c in conn.execute("PRAGMA table_info(edges)").fetchall()]
out: list[dict] = []
for row in rows:
d = dict(zip(cols, row))
d["knowledge"] = json.loads(d.pop("knowledge_json"))
out.append(d)
return out
+162
View File
@@ -0,0 +1,162 @@
from chat.db.migrate import apply_migrations
from chat.db.connection import open_db
from chat.eventlog.log import append_event
from chat.eventlog.projector import project
from chat.state.edges import get_edge, list_edges_for
import chat.state.entities # registers bot/you handlers
import chat.state.edges # registers edge_update handler
def _seed_entities(conn) -> None:
append_event(conn, kind="bot_authored", payload={
"id": "bot_a", "name": "BotA", "persona": "p",
"voice_samples": [], "traits": [],
"backstory": "", "initial_relationship_to_you": "",
"kickoff_prose": "",
})
append_event(conn, kind="you_authored", payload={
"name": "Me", "pronouns": "", "persona": "",
})
def test_edge_update_upsert_applies_first_delta(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_entities(conn)
append_event(conn, kind="edge_update", payload={
"source_id": "bot_a", "target_id": "you",
"affinity_delta": 5,
})
project(conn)
edge = get_edge(conn, "bot_a", "you")
assert edge is not None
assert edge["affinity"] == 55
assert edge["trust"] == 50
assert edge["knowledge"] == []
assert edge["summary"] == ""
def test_edge_update_multiple_deltas_accumulate(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_entities(conn)
append_event(conn, kind="edge_update", payload={
"source_id": "bot_a", "target_id": "you",
"affinity_delta": 5,
})
append_event(conn, kind="edge_update", payload={
"source_id": "bot_a", "target_id": "you",
"affinity_delta": -3,
"trust_delta": 2,
"knowledge_facts": ["she has a sister"],
})
project(conn)
edge = get_edge(conn, "bot_a", "you")
assert edge is not None
assert edge["affinity"] == 52
assert edge["trust"] == 52
assert edge["knowledge"] == ["she has a sister"]
def test_edge_update_clamps_affinity_at_max(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_entities(conn)
append_event(conn, kind="edge_update", payload={
"source_id": "bot_a", "target_id": "you",
"affinity_delta": 5,
})
append_event(conn, kind="edge_update", payload={
"source_id": "bot_a", "target_id": "you",
"affinity_delta": 100,
})
project(conn)
edge = get_edge(conn, "bot_a", "you")
assert edge is not None
assert edge["affinity"] == 100
def test_edge_update_clamps_trust_at_min(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_entities(conn)
append_event(conn, kind="edge_update", payload={
"source_id": "bot_a", "target_id": "you",
"trust_delta": -200,
})
project(conn)
edge = get_edge(conn, "bot_a", "you")
assert edge is not None
assert edge["trust"] == 0
def test_edges_are_directed_and_asymmetric(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_entities(conn)
append_event(conn, kind="edge_update", payload={
"source_id": "bot_a", "target_id": "you",
"affinity_delta": 5,
})
append_event(conn, kind="edge_update", payload={
"source_id": "you", "target_id": "bot_a",
"affinity_delta": 10,
})
project(conn)
forward = get_edge(conn, "bot_a", "you")
reverse = get_edge(conn, "you", "bot_a")
assert forward is not None and reverse is not None
assert forward["affinity"] == 55
assert reverse["affinity"] == 60
# Independent rows
assert forward["affinity"] != reverse["affinity"]
def test_edge_update_bumps_last_interaction(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_entities(conn)
append_event(conn, kind="edge_update", payload={
"source_id": "bot_a", "target_id": "you",
"affinity_delta": 1,
"last_interaction_at": "2026-04-26T10:00:00",
"last_interaction_chat_id": "chat_bot_a",
})
project(conn)
edge = get_edge(conn, "bot_a", "you")
assert edge is not None
assert edge["last_interaction_at"] == "2026-04-26T10:00:00"
assert edge["last_interaction_chat_id"] == "chat_bot_a"
def test_list_edges_for_returns_outgoing_only(tmp_path):
db = tmp_path / "t.db"
apply_migrations(db)
with open_db(db) as conn:
_seed_entities(conn)
append_event(conn, kind="bot_authored", payload={
"id": "bot_b", "name": "BotB", "persona": "p",
"voice_samples": [], "traits": [],
"backstory": "", "initial_relationship_to_you": "",
"kickoff_prose": "",
})
append_event(conn, kind="edge_update", payload={
"source_id": "bot_a", "target_id": "you", "affinity_delta": 1,
})
append_event(conn, kind="edge_update", payload={
"source_id": "bot_a", "target_id": "bot_b", "affinity_delta": 2,
})
append_event(conn, kind="edge_update", payload={
"source_id": "bot_b", "target_id": "bot_a", "affinity_delta": 3,
})
project(conn)
outgoing = list_edges_for(conn, "bot_a")
targets = [e["target_id"] for e in outgoing]
assert targets == sorted(targets)
assert set(targets) == {"you", "bot_b"}