Files
natsnet/tools/batch_planner.py

648 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Batch planner for NatsNet porting project.
Copies porting.db -> porting_batches.db, creates batch assignment tables,
and populates 42 batches (0-41) with deferred features and tests.
Usage:
python3 tools/batch_planner.py
"""
import shutil
import sqlite3
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
DB_SOURCE = ROOT / "porting.db"
DB_TARGET = ROOT / "porting_batches.db"
NEW_TABLES_SQL = """
CREATE TABLE IF NOT EXISTS implementation_batches (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
priority INTEGER NOT NULL,
feature_count INTEGER DEFAULT 0,
test_count INTEGER DEFAULT 0,
status TEXT DEFAULT 'pending',
depends_on TEXT,
go_files TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS batch_features (
batch_id INTEGER NOT NULL REFERENCES implementation_batches(id),
feature_id INTEGER NOT NULL REFERENCES features(id),
PRIMARY KEY (batch_id, feature_id)
);
CREATE TABLE IF NOT EXISTS batch_tests (
batch_id INTEGER NOT NULL REFERENCES implementation_batches(id),
test_id INTEGER NOT NULL REFERENCES unit_tests(id),
PRIMARY KEY (batch_id, test_id)
);
"""
# Batch metadata: (id, name, description, priority, depends_on, go_files)
BATCH_META = [
(0, "Implementable Tests",
"Tests whose feature dependencies are all already verified",
0, None, None),
(1, "Proto, Const, CipherSuites, NKey, JWT",
"Foundation protocol and crypto types",
1, None, "proto.go,const.go,ciphersuites.go,nkey.go,jwt.go"),
(2, "Parser, Sublist, MemStore remainders",
"Core data structure remainders",
2, "1", "parser.go,sublist.go,memstore.go"),
(3, "SendQ, Service, Client ProxyProto",
"Send queue, OS service, proxy protocol",
3, "1", "sendq.go,service.go,service_windows.go,client_proxyproto.go"),
(4, "Logging",
"Server logging infrastructure",
4, "1", "log.go"),
(5, "JetStream Errors",
"JetStream error constructors (code-gen recommended)",
5, None, "jetstream_errors_generated.go,jetstream_errors.go"),
(6, "Opts package-level functions",
"Options parsing package-level functions",
6, "4", "opts.go"),
(7, "Opts class methods + Reload",
"Options struct methods and config reload",
7, "6", "opts.go,reload.go"),
(8, "Store Interfaces",
"JetStream store interface definitions",
8, None, "store.go"),
(9, "Auth, DirStore, OCSP foundations",
"Authentication, directory store, OCSP basics",
9, "4,6", "auth.go,dirstore.go,ocsp.go,ocsp_peer.go"),
(10, "OCSP Cache + JS Events",
"OCSP response cache and JetStream events",
10, "9", "ocsp_responsecache.go,jetstream_events.go"),
(11, "FileStore Init",
"FileStore package funcs + fileStore methods <= line 1500",
11, "8", "filestore.go"),
(12, "FileStore Recovery",
"fileStore methods lines 1501-2800",
12, "11", "filestore.go"),
(13, "FileStore Read/Query",
"fileStore methods lines 2801-4200",
13, "12", "filestore.go"),
(14, "FileStore Write/Lifecycle",
"fileStore methods lines 4201+",
14, "13", "filestore.go"),
(15, "MsgBlock + ConsumerFileStore",
"msgBlock and consumerFileStore classes + remaining filestore",
15, "14", "filestore.go"),
(16, "Client Core (first half)",
"Client methods <= line 3000",
16, "2,3,4", "client.go"),
(17, "Client Core (second half)",
"Client methods > line 3000",
17, "16", "client.go"),
(18, "Server Core",
"Core server methods",
18, "4,16", "server.go"),
(19, "Accounts Core",
"Account class methods",
19, "16,18", "accounts.go"),
(20, "Accounts Resolvers",
"Account resolvers, package funcs, Server account methods",
20, "19", "accounts.go"),
(21, "Events + MsgTrace",
"Server events and message tracing",
21, "18,19", "events.go,msgtrace.go"),
(22, "Monitoring",
"Server monitoring endpoints",
22, "18,19", "monitor.go"),
(23, "Routes",
"Route connection handling",
23, "16,18", "route.go"),
(24, "Leaf Nodes",
"Leaf node connection handling",
24, "19,23", "leafnode.go"),
(25, "Gateways",
"Gateway connection handling",
25, "19,23", "gateway.go"),
(26, "WebSocket",
"WebSocket transport layer",
26, "16,18", "websocket.go"),
(27, "JetStream Core",
"Core JetStream functionality",
27, "5,8,19", "jetstream.go"),
(28, "JetStream API",
"JetStream API handlers",
28, "5,27", "jetstream_api.go"),
(29, "JetStream Batching",
"JetStream message batching",
29, "27", "jetstream_batching.go"),
(30, "Raft Part 1",
"Raft consensus <= line 3200 + pkg funcs + small types",
30, "4,18", "raft.go"),
(31, "Raft Part 2",
"Raft consensus > line 3200",
31, "30", "raft.go"),
(32, "JS Cluster Meta",
"JetStream cluster pkg funcs + small types + init methods",
32, "27,31", "jetstream_cluster.go"),
(33, "JS Cluster Streams",
"JetStream cluster stream operations",
33, "32", "jetstream_cluster.go"),
(34, "JS Cluster Consumers",
"JetStream cluster consumer operations",
34, "33", "jetstream_cluster.go"),
(35, "JS Cluster Remaining",
"All remaining JetStream cluster features",
35, "32", "jetstream_cluster.go"),
(36, "Stream Lifecycle",
"Stream features <= line 4600",
36, "8,11,12,13,14,15,28", "stream.go"),
(37, "Stream Messages",
"Stream features > line 4600",
37, "36", "stream.go"),
(38, "Consumer Lifecycle",
"Consumer features <= line 3800",
38, "34,36", "consumer.go"),
(39, "Consumer Dispatch",
"Consumer features > line 3800",
39, "38", "consumer.go"),
(40, "MQTT Server/JSA",
"MQTT features <= line 3500",
40, "19,27", "mqtt.go"),
(41, "MQTT Client/IO",
"MQTT features > line 3500",
41, "40", "mqtt.go"),
]
# Mapping from test go_file to source go_file for orphan test assignment.
# Only entries that differ from the default _test.go -> .go stripping.
TEST_FILE_TO_SOURCE = {
"server/jetstream_consumer_test.go": "server/consumer.go",
"server/jetstream_cluster_1_test.go": "server/jetstream_cluster.go",
"server/jetstream_cluster_2_test.go": "server/jetstream_cluster.go",
"server/jetstream_cluster_3_test.go": "server/jetstream_cluster.go",
"server/jetstream_cluster_4_test.go": "server/jetstream_cluster.go",
"server/jetstream_super_cluster_test.go": "server/jetstream_cluster.go",
"server/jetstream_leafnode_test.go": "server/leafnode.go",
"server/jetstream_jwt_test.go": "server/jwt.go",
"server/jetstream_benchmark_test.go": "server/jetstream.go",
"server/norace_1_test.go": "server/server.go",
"server/norace_2_test.go": "server/server.go",
"server/routes_test.go": "server/route.go",
"server/auth_callout_test.go": "server/auth.go",
}
def assign_features_by_file(cur, batch_id, go_files):
"""Assign all unassigned deferred features from given files to a batch."""
placeholders = ",".join("?" for _ in go_files)
cur.execute(f"""
INSERT INTO batch_features (batch_id, feature_id)
SELECT ?, f.id FROM features f
WHERE f.status = 'deferred'
AND f.go_file IN ({placeholders})
AND f.id NOT IN (SELECT feature_id FROM batch_features)
""", [batch_id] + go_files)
return cur.rowcount
def assign_features_by_query(cur, batch_id, where_clause, params=None):
"""Assign deferred features matching a WHERE clause to a batch."""
cur.execute(f"""
INSERT INTO batch_features (batch_id, feature_id)
SELECT ?, f.id FROM features f
WHERE f.status = 'deferred'
AND f.id NOT IN (SELECT feature_id FROM batch_features)
AND ({where_clause})
""", [batch_id] + (params or []))
return cur.rowcount
def split_file_evenly(cur, go_file, batch_ids):
"""Split a file's unassigned deferred features evenly across batches by line number."""
cur.execute("""
SELECT f.id FROM features f
WHERE f.status = 'deferred'
AND f.go_file = ?
AND f.id NOT IN (SELECT feature_id FROM batch_features)
ORDER BY f.go_line_number
""", (go_file,))
feature_ids = [row[0] for row in cur.fetchall()]
n = len(batch_ids)
if n == 0 or not feature_ids:
return
chunk_size = len(feature_ids) // n
remainder = len(feature_ids) % n
offset = 0
for i, bid in enumerate(batch_ids):
size = chunk_size + (1 if i < remainder else 0)
for fid in feature_ids[offset:offset + size]:
cur.execute(
"INSERT INTO batch_features (batch_id, feature_id) VALUES (?, ?)",
(bid, fid),
)
offset += size
def assign_all_features(cur):
"""Assign all deferred features to batches."""
# B1: Proto, Const, CipherSuites, NKey, JWT
assign_features_by_file(cur, 1, [
"server/proto.go", "server/const.go", "server/ciphersuites.go",
"server/nkey.go", "server/jwt.go",
])
# B2: Parser, Sublist, MemStore remainders
assign_features_by_file(cur, 2, [
"server/parser.go", "server/sublist.go", "server/memstore.go",
])
# B3: SendQ, Service, Client ProxyProto
assign_features_by_file(cur, 3, [
"server/sendq.go", "server/service.go", "server/service_windows.go",
"server/client_proxyproto.go",
])
# B4: Logging
assign_features_by_file(cur, 4, ["server/log.go"])
# B5: JetStream Errors
assign_features_by_file(cur, 5, [
"server/jetstream_errors_generated.go", "server/jetstream_errors.go",
])
# B6: Opts package-level functions (go_class is empty string)
assign_features_by_query(cur, 6,
"f.go_file = 'server/opts.go' AND (f.go_class = '' OR f.go_class IS NULL)")
# B7: Opts class methods + Reload
assign_features_by_query(cur, 7,
"f.go_file = 'server/opts.go' AND f.go_class != '' AND f.go_class IS NOT NULL")
assign_features_by_file(cur, 7, ["server/reload.go"])
# B8: Store Interfaces
assign_features_by_file(cur, 8, ["server/store.go"])
# B9: Auth, DirStore, OCSP foundations
assign_features_by_file(cur, 9, [
"server/auth.go", "server/dirstore.go",
"server/ocsp.go", "server/ocsp_peer.go",
])
# B10: OCSP Cache + JS Events
assign_features_by_file(cur, 10, [
"server/ocsp_responsecache.go", "server/jetstream_events.go",
])
# --- FileStore (B11-B15) ---
# B11: Package funcs + fileStore methods <= line 1500
assign_features_by_query(cur, 11,
"f.go_file = 'server/filestore.go' AND "
"((f.go_class = '' OR f.go_class IS NULL) OR "
" (f.go_class = 'fileStore' AND f.go_line_number <= 1500))")
# B12: fileStore methods lines 1501-2800
assign_features_by_query(cur, 12,
"f.go_file = 'server/filestore.go' AND f.go_class = 'fileStore' "
"AND f.go_line_number > 1500 AND f.go_line_number <= 2800")
# B13: fileStore methods lines 2801-4200
assign_features_by_query(cur, 13,
"f.go_file = 'server/filestore.go' AND f.go_class = 'fileStore' "
"AND f.go_line_number > 2800 AND f.go_line_number <= 4200")
# B14: fileStore methods lines 4201+
assign_features_by_query(cur, 14,
"f.go_file = 'server/filestore.go' AND f.go_class = 'fileStore' "
"AND f.go_line_number > 4200")
# B15: msgBlock + consumerFileStore + remaining filestore classes
assign_features_by_file(cur, 15, ["server/filestore.go"])
# --- Client (B16-B17) ---
# B16: Client methods <= line 3000
assign_features_by_query(cur, 16,
"f.go_file = 'server/client.go' AND f.go_line_number <= 3000")
# B17: Client methods > line 3000 (catch remaining)
assign_features_by_file(cur, 17, ["server/client.go"])
# B18: Server Core
assign_features_by_file(cur, 18, ["server/server.go"])
# --- Accounts (B19-B20) ---
# B19: Account class
assign_features_by_query(cur, 19,
"f.go_file = 'server/accounts.go' AND f.go_class = 'Account'")
# B20: Remaining accounts (resolvers, pkg funcs, Server methods)
assign_features_by_file(cur, 20, ["server/accounts.go"])
# B21: Events + MsgTrace
assign_features_by_file(cur, 21, ["server/events.go", "server/msgtrace.go"])
# B22: Monitoring
assign_features_by_file(cur, 22, ["server/monitor.go"])
# B23: Routes
assign_features_by_file(cur, 23, ["server/route.go"])
# B24: Leaf Nodes
assign_features_by_file(cur, 24, ["server/leafnode.go"])
# B25: Gateways
assign_features_by_file(cur, 25, ["server/gateway.go"])
# B26: WebSocket
assign_features_by_file(cur, 26, ["server/websocket.go"])
# B27: JetStream Core
assign_features_by_file(cur, 27, ["server/jetstream.go"])
# B28: JetStream API
assign_features_by_file(cur, 28, ["server/jetstream_api.go"])
# B29: JetStream Batching
assign_features_by_file(cur, 29, ["server/jetstream_batching.go"])
# --- Raft (B30-B31) ---
# B30: pkg funcs + small types + raft class <= line 3200
assign_features_by_query(cur, 30,
"f.go_file = 'server/raft.go' AND "
"(f.go_class IS NULL OR f.go_class != 'raft' OR "
" (f.go_class = 'raft' AND f.go_line_number <= 3200))")
# B31: raft class > line 3200 (catch remaining)
assign_features_by_file(cur, 31, ["server/raft.go"])
# --- JetStream Cluster (B32-B35): split 231 features by line number ---
split_file_evenly(cur, "server/jetstream_cluster.go", [32, 33, 34, 35])
# --- Stream (B36-B37) ---
# B36: Stream features <= line 4600
assign_features_by_query(cur, 36,
"f.go_file = 'server/stream.go' AND f.go_line_number <= 4600")
# B37: Stream features > line 4600 (catch remaining)
assign_features_by_file(cur, 37, ["server/stream.go"])
# --- Consumer (B38-B39) ---
# B38: Consumer features <= line 3800
assign_features_by_query(cur, 38,
"f.go_file = 'server/consumer.go' AND f.go_line_number <= 3800")
# B39: Consumer features > line 3800 (catch remaining)
assign_features_by_file(cur, 39, ["server/consumer.go"])
# --- MQTT (B40-B41) ---
# B40: MQTT features <= line 3500
assign_features_by_query(cur, 40,
"f.go_file = 'server/mqtt.go' AND f.go_line_number <= 3500")
# B41: MQTT features > line 3500 (catch remaining)
assign_features_by_file(cur, 41, ["server/mqtt.go"])
# --- Sweep: assign any remaining deferred features ---
cur.execute("""
SELECT DISTINCT f.go_file FROM features f
WHERE f.status = 'deferred'
AND f.id NOT IN (SELECT feature_id FROM batch_features)
""")
remaining_files = [row[0] for row in cur.fetchall()]
if remaining_files:
for go_file in remaining_files:
assign_features_by_file(cur, 18, [go_file])
print(f" Sweep: {len(remaining_files)} extra files assigned to B18: "
f"{remaining_files}")
def get_orphan_batch(cur, test_go_file):
"""Find the primary batch for an orphan test based on its go_file."""
source_file = TEST_FILE_TO_SOURCE.get(test_go_file)
if source_file is None:
source_file = test_go_file.replace("_test.go", ".go")
cur.execute("""
SELECT MIN(bf.batch_id) FROM batch_features bf
JOIN features f ON bf.feature_id = f.id
WHERE f.go_file = ?
""", (source_file,))
row = cur.fetchone()
if row and row[0] is not None:
return row[0]
# Fallback: Server Core
return 18
def assign_tests(cur):
"""Assign all deferred tests to batches."""
cur.execute("SELECT id, go_file FROM unit_tests WHERE status = 'deferred'")
deferred_tests = cur.fetchall()
batch_0_count = 0
dep_count = 0
orphan_count = 0
for test_id, test_go_file in deferred_tests:
# Find all feature dependencies for this test
cur.execute("""
SELECT d.target_id, f.status
FROM dependencies d
JOIN features f ON d.target_id = f.id AND d.target_type = 'feature'
WHERE d.source_type = 'unit_test' AND d.source_id = ?
""", (test_id,))
deps = cur.fetchall()
if not deps:
# Orphan test: no dependency rows at all
batch_id = get_orphan_batch(cur, test_go_file)
orphan_count += 1
else:
# Collect deps whose features are still deferred/not-done
deferred_dep_ids = [
target_id for target_id, status in deps
if status not in ("verified", "complete", "n_a")
]
if not deferred_dep_ids:
# All deps satisfied -> Batch 0
batch_id = 0
batch_0_count += 1
else:
# Assign to the highest batch among deferred deps
placeholders = ",".join("?" for _ in deferred_dep_ids)
cur.execute(f"""
SELECT MAX(bf.batch_id) FROM batch_features bf
WHERE bf.feature_id IN ({placeholders})
""", deferred_dep_ids)
row = cur.fetchone()
if row and row[0] is not None:
batch_id = row[0]
dep_count += 1
else:
# Deferred deps exist but none in batch_features (edge case)
batch_id = get_orphan_batch(cur, test_go_file)
orphan_count += 1
cur.execute(
"INSERT INTO batch_tests (batch_id, test_id) VALUES (?, ?)",
(batch_id, test_id),
)
print(f" Batch 0 (implementable): {batch_0_count}")
print(f" By dependency: {dep_count}")
print(f" Orphans (by file): {orphan_count}")
def update_counts(cur):
"""Update feature_count and test_count on each batch."""
cur.execute("""
UPDATE implementation_batches SET
feature_count = (
SELECT COUNT(*) FROM batch_features
WHERE batch_id = implementation_batches.id
),
test_count = (
SELECT COUNT(*) FROM batch_tests
WHERE batch_id = implementation_batches.id
)
""")
def print_summary(cur):
"""Print a summary report."""
print()
print("=" * 80)
print("BATCH PLANNER SUMMARY")
print("=" * 80)
cur.execute("""
SELECT id, name, feature_count, test_count, depends_on
FROM implementation_batches ORDER BY id
""")
rows = cur.fetchall()
total_features = 0
total_tests = 0
print(f"\n{'ID':>3} {'Name':<35} {'Feats':>5} {'Tests':>5} {'Depends':>15}")
print("-" * 70)
for bid, name, fc, tc, deps in rows:
total_features += fc
total_tests += tc
deps_str = deps if deps else "-"
print(f"{bid:>3} {name:<35} {fc:>5} {tc:>5} {deps_str:>15}")
print("-" * 70)
print(f"{'':>3} {'TOTAL':<35} {total_features:>5} {total_tests:>5}")
# Verification
cur.execute("""
SELECT COUNT(*) FROM features
WHERE status = 'deferred'
AND id NOT IN (SELECT feature_id FROM batch_features)
""")
unassigned_features = cur.fetchone()[0]
cur.execute("""
SELECT COUNT(*) FROM unit_tests
WHERE status = 'deferred'
AND id NOT IN (SELECT test_id FROM batch_tests)
""")
unassigned_tests = cur.fetchone()[0]
print(f"\nUnassigned deferred features: {unassigned_features}")
print(f"Unassigned deferred tests: {unassigned_tests}")
if unassigned_features == 0 and unassigned_tests == 0:
print("\nAll deferred items assigned to batches.")
else:
print("\nWARNING: Some items remain unassigned!")
if unassigned_features > 0:
cur.execute("""
SELECT go_file, COUNT(*) FROM features
WHERE status = 'deferred'
AND id NOT IN (SELECT feature_id FROM batch_features)
GROUP BY go_file
""")
for go_file, cnt in cur.fetchall():
print(f" Unassigned features in {go_file}: {cnt}")
# Spot-check: raft.shutdown
cur.execute("""
SELECT bf.batch_id, f.go_method FROM batch_features bf
JOIN features f ON bf.feature_id = f.id
WHERE f.go_method = 'shutdown' AND f.go_class = 'raft'
""")
raft_shutdown = cur.fetchall()
if raft_shutdown:
print(f"\nSpot-check: raft.shutdown -> batch {raft_shutdown[0][0]}")
else:
print("\nSpot-check: raft.shutdown not found in deferred features")
def main():
if not DB_SOURCE.exists():
print(f"Error: {DB_SOURCE} not found", file=sys.stderr)
sys.exit(1)
# Step 1: Copy database
print(f"Copying {DB_SOURCE} -> {DB_TARGET}")
shutil.copy2(DB_SOURCE, DB_TARGET)
conn = sqlite3.connect(str(DB_TARGET))
cur = conn.cursor()
cur.execute("PRAGMA foreign_keys = ON")
# Step 2: Create new tables
print("Creating batch tables...")
cur.executescript(NEW_TABLES_SQL)
# Step 3: Insert batch metadata
print("Inserting batch metadata...")
for bid, name, desc, priority, deps, go_files in BATCH_META:
cur.execute(
"INSERT INTO implementation_batches "
"(id, name, description, priority, depends_on, go_files) "
"VALUES (?, ?, ?, ?, ?, ?)",
(bid, name, desc, priority, deps, go_files),
)
# Step 4: Assign features
print("Assigning features to batches...")
assign_all_features(cur)
# Step 5: Assign tests
print("Assigning tests to batches...")
assign_tests(cur)
# Step 6: Update counts
print("Updating batch counts...")
update_counts(cur)
conn.commit()
# Step 7: Print summary
print_summary(cur)
conn.close()
print(f"\nDone. Output: {DB_TARGET}")
if __name__ == "__main__":
main()