from __future__ import annotations import argparse import csv import struct from collections import defaultdict from pathlib import Path def load_rows(path: Path) -> list[dict[str, str]]: with path.open("r", encoding="utf-8", newline="") as handle: return list(csv.DictReader(handle, delimiter="\t")) def body_bytes(row: dict[str, str]) -> bytes: return bytes.fromhex(row["hex"]) def ordinal_groups(rows: list[dict[str, str]]) -> dict[tuple[str, str, str], list[dict[str, str]]]: groups: dict[tuple[str, str, str], list[dict[str, str]]] = defaultdict(list) for row in rows: key = (row["direction"], row["record_type"], row["record_size"]) groups[key].append(row) return groups def i32_at(data: bytes, offset: int) -> int | None: if offset + 4 > len(data): return None return struct.unpack_from(" list[int]: limit = min(len(a), len(b)) offsets = [index for index in range(limit) if a[index] != b[index]] offsets.extend(range(limit, max(len(a), len(b)))) return offsets def byte_pairs(offsets: list[int], a: bytes, b: bytes, limit: int) -> str: pairs: list[str] = [] for offset in offsets[:limit]: left = f"{a[offset]:02x}" if offset < len(a) else "--" right = f"{b[offset]:02x}" if offset < len(b) else "--" pairs.append(f"{offset}:{left}->{right}") return " ".join(pairs) def i32_diffs(offsets: list[int], a: bytes, b: bytes) -> str: touched_words = sorted({offset - (offset % 4) for offset in offsets}) parts: list[str] = [] for offset in touched_words: left = i32_at(a, offset) right = i32_at(b, offset) if left is None or right is None or left == right: continue parts.append(f"{offset}:{left}->{right}") return " ".join(parts) def filter_rows( rows: list[dict[str, str]], write_index: str | None, min_complete: float | None, max_complete: float | None, ) -> list[dict[str, str]]: out: list[dict[str, str]] = [] for row in rows: if write_index is not None and row.get("write_index") != write_index: continue if min_complete is not None or max_complete is not None: value = row.get("packet_time_relative_to_complete", "") if value == "": continue relative = float(value) if min_complete is not None and relative < min_complete: continue if max_complete is not None and relative > max_complete: continue out.append(row) return out def compare_rows(a_rows: list[dict[str, str]], b_rows: list[dict[str, str]], max_byte_pairs: int) -> list[dict[str, str]]: a_groups = ordinal_groups(a_rows) b_groups = ordinal_groups(b_rows) out: list[dict[str, str]] = [] for key in sorted(set(a_groups) | set(b_groups)): left_rows = a_groups.get(key, []) right_rows = b_groups.get(key, []) count = max(len(left_rows), len(right_rows)) for ordinal in range(count): if ordinal >= len(left_rows) or ordinal >= len(right_rows): out.append({ "write_a": "" if ordinal >= len(left_rows) else left_rows[ordinal].get("write_index", ""), "write_value_a": "" if ordinal >= len(left_rows) else left_rows[ordinal].get("write_value", ""), "write_b": "" if ordinal >= len(right_rows) else right_rows[ordinal].get("write_index", ""), "write_value_b": "" if ordinal >= len(right_rows) else right_rows[ordinal].get("write_value", ""), "direction": key[0], "record_type": key[1], "record_size": key[2], "ordinal": str(ordinal), "status": "missing_a" if ordinal >= len(left_rows) else "missing_b", "frame_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["frame"], "frame_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["frame"], "time_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["packet_time_relative_to_write"], "time_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["packet_time_relative_to_write"], "signature16_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["signature16"], "signature16_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["signature16"], "bytes_differ": "", "diff_offsets": "", "byte_pairs": "", "i32_diffs": "", "ascii_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["ascii_preview"], "ascii_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["ascii_preview"], }) continue left = left_rows[ordinal] right = right_rows[ordinal] left_body = body_bytes(left) right_body = body_bytes(right) offsets = diff_offsets(left_body, right_body) out.append({ "write_a": left.get("write_index", ""), "write_value_a": left.get("write_value", ""), "write_b": right.get("write_index", ""), "write_value_b": right.get("write_value", ""), "direction": key[0], "record_type": key[1], "record_size": key[2], "ordinal": str(ordinal), "status": "same" if not offsets else "different", "frame_a": left["frame"], "frame_b": right["frame"], "time_a": left["packet_time_relative_to_write"], "time_b": right["packet_time_relative_to_write"], "signature16_a": left["signature16"], "signature16_b": right["signature16"], "bytes_differ": str(len(offsets)), "diff_offsets": " ".join(str(offset) for offset in offsets[:max_byte_pairs]), "byte_pairs": byte_pairs(offsets, left_body, right_body, max_byte_pairs), "i32_diffs": i32_diffs(offsets, left_body, right_body), "ascii_a": left["ascii_preview"], "ascii_b": right["ascii_preview"], }) return out def write_rows(rows: list[dict[str, str]], out: Path) -> None: out.parent.mkdir(parents=True, exist_ok=True) header = [ "write_a", "write_value_a", "write_b", "write_value_b", "direction", "record_type", "record_size", "ordinal", "status", "frame_a", "frame_b", "time_a", "time_b", "signature16_a", "signature16_b", "bytes_differ", "diff_offsets", "byte_pairs", "i32_diffs", "ascii_a", "ascii_b", ] with out.open("w", encoding="utf-8", newline="") as handle: writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n") writer.writeheader() writer.writerows(rows) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("left", type=Path) parser.add_argument("right", type=Path) parser.add_argument("--out", type=Path, default=Path("analysis/network/write-window-body-diff.tsv")) parser.add_argument("--left-write-index") parser.add_argument("--right-write-index") parser.add_argument("--time-complete-min", type=float) parser.add_argument("--time-complete-max", type=float) parser.add_argument("--max-byte-pairs", type=int, default=64) args = parser.parse_args() left_rows = filter_rows(load_rows(args.left), args.left_write_index, args.time_complete_min, args.time_complete_max) right_rows = filter_rows(load_rows(args.right), args.right_write_index, args.time_complete_min, args.time_complete_max) rows = compare_rows(left_rows, right_rows, args.max_byte_pairs) write_rows(rows, args.out) same = sum(1 for row in rows if row["status"] == "same") different = sum(1 for row in rows if row["status"] == "different") missing = len(rows) - same - different print(f"wrote {args.out}") print(f"same={same} different={different} missing={missing}") return 0 if __name__ == "__main__": raise SystemExit(main())