Files
mxaccess/analysis/scripts/diff_write_window_records.py
T
Joseph Doherty fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Initial project state: .NET reference, design, Rust port (M0+M1), evidence
Layout:
- src/                    .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
                          MxAsbClient, probes, tests, harnesses. Executable spec.
- design/                 Architectural plan for the Rust port (M0–M6), error
                          model, protocol invariants, risks (R1–R16), adversarial
                          review log (review.md).
- rust/                   Rust workspace. M0 skeleton + M1 codec parity.
                          mxaccess-codec: 215 unit tests + 2 cross-implementation
                          parity tests (byte-identical against .NET reference).
                          Other crates are M0 stubs awaiting M2+.
- captures/               Frida + netsh + pcap evidence per CLAUDE.md
                          ("captures are evidence, not throwaway logs").
- analysis/               Decompiled C# (frida/proxy/decompiled-*),
                          Ghidra exports for native DLLs (`exports/` only —
                          working state at `projects/` and AVEVA's input
                          binaries at `input/` are gitignored).
- docs/                   Reverse-engineering reference docs.
- tools/                  Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
                          Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/      Rust CI: fmt + build + test + clippy on Windows.
- LICENSE                 MIT (Joseph Doherty, 2026).

Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly

Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 06:21:00 -04:00

210 lines
8.3 KiB
Python

from __future__ import annotations
import argparse
import csv
import struct
from collections import defaultdict
from pathlib import Path
def load_rows(path: Path) -> list[dict[str, str]]:
with path.open("r", encoding="utf-8", newline="") as handle:
return list(csv.DictReader(handle, delimiter="\t"))
def body_bytes(row: dict[str, str]) -> bytes:
return bytes.fromhex(row["hex"])
def ordinal_groups(rows: list[dict[str, str]]) -> dict[tuple[str, str, str], list[dict[str, str]]]:
groups: dict[tuple[str, str, str], list[dict[str, str]]] = defaultdict(list)
for row in rows:
key = (row["direction"], row["record_type"], row["record_size"])
groups[key].append(row)
return groups
def i32_at(data: bytes, offset: int) -> int | None:
if offset + 4 > len(data):
return None
return struct.unpack_from("<i", data, offset)[0]
def diff_offsets(a: bytes, b: bytes) -> list[int]:
limit = min(len(a), len(b))
offsets = [index for index in range(limit) if a[index] != b[index]]
offsets.extend(range(limit, max(len(a), len(b))))
return offsets
def byte_pairs(offsets: list[int], a: bytes, b: bytes, limit: int) -> str:
pairs: list[str] = []
for offset in offsets[:limit]:
left = f"{a[offset]:02x}" if offset < len(a) else "--"
right = f"{b[offset]:02x}" if offset < len(b) else "--"
pairs.append(f"{offset}:{left}->{right}")
return " ".join(pairs)
def i32_diffs(offsets: list[int], a: bytes, b: bytes) -> str:
touched_words = sorted({offset - (offset % 4) for offset in offsets})
parts: list[str] = []
for offset in touched_words:
left = i32_at(a, offset)
right = i32_at(b, offset)
if left is None or right is None or left == right:
continue
parts.append(f"{offset}:{left}->{right}")
return " ".join(parts)
def filter_rows(
rows: list[dict[str, str]],
write_index: str | None,
min_complete: float | None,
max_complete: float | None,
) -> list[dict[str, str]]:
out: list[dict[str, str]] = []
for row in rows:
if write_index is not None and row.get("write_index") != write_index:
continue
if min_complete is not None or max_complete is not None:
value = row.get("packet_time_relative_to_complete", "")
if value == "":
continue
relative = float(value)
if min_complete is not None and relative < min_complete:
continue
if max_complete is not None and relative > max_complete:
continue
out.append(row)
return out
def compare_rows(a_rows: list[dict[str, str]], b_rows: list[dict[str, str]], max_byte_pairs: int) -> list[dict[str, str]]:
a_groups = ordinal_groups(a_rows)
b_groups = ordinal_groups(b_rows)
out: list[dict[str, str]] = []
for key in sorted(set(a_groups) | set(b_groups)):
left_rows = a_groups.get(key, [])
right_rows = b_groups.get(key, [])
count = max(len(left_rows), len(right_rows))
for ordinal in range(count):
if ordinal >= len(left_rows) or ordinal >= len(right_rows):
out.append({
"write_a": "" if ordinal >= len(left_rows) else left_rows[ordinal].get("write_index", ""),
"write_value_a": "" if ordinal >= len(left_rows) else left_rows[ordinal].get("write_value", ""),
"write_b": "" if ordinal >= len(right_rows) else right_rows[ordinal].get("write_index", ""),
"write_value_b": "" if ordinal >= len(right_rows) else right_rows[ordinal].get("write_value", ""),
"direction": key[0],
"record_type": key[1],
"record_size": key[2],
"ordinal": str(ordinal),
"status": "missing_a" if ordinal >= len(left_rows) else "missing_b",
"frame_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["frame"],
"frame_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["frame"],
"time_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["packet_time_relative_to_write"],
"time_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["packet_time_relative_to_write"],
"signature16_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["signature16"],
"signature16_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["signature16"],
"bytes_differ": "",
"diff_offsets": "",
"byte_pairs": "",
"i32_diffs": "",
"ascii_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["ascii_preview"],
"ascii_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["ascii_preview"],
})
continue
left = left_rows[ordinal]
right = right_rows[ordinal]
left_body = body_bytes(left)
right_body = body_bytes(right)
offsets = diff_offsets(left_body, right_body)
out.append({
"write_a": left.get("write_index", ""),
"write_value_a": left.get("write_value", ""),
"write_b": right.get("write_index", ""),
"write_value_b": right.get("write_value", ""),
"direction": key[0],
"record_type": key[1],
"record_size": key[2],
"ordinal": str(ordinal),
"status": "same" if not offsets else "different",
"frame_a": left["frame"],
"frame_b": right["frame"],
"time_a": left["packet_time_relative_to_write"],
"time_b": right["packet_time_relative_to_write"],
"signature16_a": left["signature16"],
"signature16_b": right["signature16"],
"bytes_differ": str(len(offsets)),
"diff_offsets": " ".join(str(offset) for offset in offsets[:max_byte_pairs]),
"byte_pairs": byte_pairs(offsets, left_body, right_body, max_byte_pairs),
"i32_diffs": i32_diffs(offsets, left_body, right_body),
"ascii_a": left["ascii_preview"],
"ascii_b": right["ascii_preview"],
})
return out
def write_rows(rows: list[dict[str, str]], out: Path) -> None:
out.parent.mkdir(parents=True, exist_ok=True)
header = [
"write_a",
"write_value_a",
"write_b",
"write_value_b",
"direction",
"record_type",
"record_size",
"ordinal",
"status",
"frame_a",
"frame_b",
"time_a",
"time_b",
"signature16_a",
"signature16_b",
"bytes_differ",
"diff_offsets",
"byte_pairs",
"i32_diffs",
"ascii_a",
"ascii_b",
]
with out.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n")
writer.writeheader()
writer.writerows(rows)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("left", type=Path)
parser.add_argument("right", type=Path)
parser.add_argument("--out", type=Path, default=Path("analysis/network/write-window-body-diff.tsv"))
parser.add_argument("--left-write-index")
parser.add_argument("--right-write-index")
parser.add_argument("--time-complete-min", type=float)
parser.add_argument("--time-complete-max", type=float)
parser.add_argument("--max-byte-pairs", type=int, default=64)
args = parser.parse_args()
left_rows = filter_rows(load_rows(args.left), args.left_write_index, args.time_complete_min, args.time_complete_max)
right_rows = filter_rows(load_rows(args.right), args.right_write_index, args.time_complete_min, args.time_complete_max)
rows = compare_rows(left_rows, right_rows, args.max_byte_pairs)
write_rows(rows, args.out)
same = sum(1 for row in rows if row["status"] == "same")
different = sum(1 for row in rows if row["status"] == "different")
missing = len(rows) - same - different
print(f"wrote {args.out}")
print(f"same={same} different={different} missing={missing}")
return 0
if __name__ == "__main__":
raise SystemExit(main())