Files
mxaccess/analysis/scripts/map_frida_to_tcp.py
T
Joseph Doherty fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Initial project state: .NET reference, design, Rust port (M0+M1), evidence
Layout:
- src/                    .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
                          MxAsbClient, probes, tests, harnesses. Executable spec.
- design/                 Architectural plan for the Rust port (M0–M6), error
                          model, protocol invariants, risks (R1–R16), adversarial
                          review log (review.md).
- rust/                   Rust workspace. M0 skeleton + M1 codec parity.
                          mxaccess-codec: 215 unit tests + 2 cross-implementation
                          parity tests (byte-identical against .NET reference).
                          Other crates are M0 stubs awaiting M2+.
- captures/               Frida + netsh + pcap evidence per CLAUDE.md
                          ("captures are evidence, not throwaway logs").
- analysis/               Decompiled C# (frida/proxy/decompiled-*),
                          Ghidra exports for native DLLs (`exports/` only —
                          working state at `projects/` and AVEVA's input
                          binaries at `input/` are gitignored).
- docs/                   Reverse-engineering reference docs.
- tools/                  Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
                          Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/      Rust CI: fmt + build + test + clippy on Windows.
- LICENSE                 MIT (Joseph Doherty, 2026).

Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly

Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 06:21:00 -04:00

124 lines
4.0 KiB
Python

from __future__ import annotations
import argparse
import json
import re
import struct
from pathlib import Path
ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
def load_frida_events(path: Path) -> list[dict]:
events: list[dict] = []
for raw in path.read_text(errors="replace").splitlines():
line = ANSI_RE.sub("", raw).strip()
index = line.find("{")
if index < 0:
continue
try:
outer = json.loads(line[index:])
except json.JSONDecodeError:
continue
payload = outer.get("payload") if outer.get("type") == "send" else outer
if isinstance(payload, str):
try:
payload = json.loads(payload)
except json.JSONDecodeError:
continue
if isinstance(payload, dict) and payload.get("event"):
events.append(payload)
return events
def first_candidate(event: dict, min_size: int = 20, max_size: int = 4096) -> bytes | None:
for candidate in event.get("candidates") or []:
hex_text = candidate.get("hex") or ""
size = int(candidate.get("size") or 0)
if hex_text and min_size <= size <= max_size:
return bytes.fromhex(hex_text)
return None
def extract_needles(events: list[dict], scalar_int: int | None) -> list[tuple[str, bytes]]:
start = None
for index, event in enumerate(events):
if event.get("name", "").startswith("CLMXProxyServer.Write") and event.get("args"):
start = index
break
needles: list[tuple[str, bytes]] = []
if scalar_int is not None:
needles.append((f"int32:{scalar_int}", struct.pack("<i", scalar_int)))
if start is None:
return needles
for event in events[start + 1:]:
name = event.get("name")
if name not in ("CNmxAdapter.PutRequest", "CNmxAdapter.TransferData", "CNmxAdapter.ProcessDataReceived"):
continue
body = first_candidate(event)
if body is None:
continue
if name == "CNmxAdapter.PutRequest" and body.startswith(b"\x37\x01"):
needles.append(("putrequest-body", body))
elif name == "CNmxAdapter.TransferData" and (scalar_int is None or struct.pack("<i", scalar_int) in body):
needles.append(("transferdata-body", body))
elif name == "CNmxAdapter.ProcessDataReceived" and (scalar_int is None or struct.pack("<i", scalar_int) in body):
needles.append(("callback-body", body))
found_names = {name for name, _ in needles}
if {"putrequest-body", "transferdata-body", "callback-body"}.issubset(found_names):
break
return needles
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("capture_dir", type=Path)
parser.add_argument("--scalar-int", type=int)
parser.add_argument("--frida-log", type=Path)
args = parser.parse_args()
frida_log = args.frida_log
if frida_log is None:
candidates = [
args.capture_dir / "frida.stdout.jsonl",
args.capture_dir / "client-frida.stdout.jsonl",
]
frida_log = next((path for path in candidates if path.exists()), candidates[0])
events = load_frida_events(frida_log)
needles = extract_needles(events, args.scalar_int)
stream_paths = sorted(args.capture_dir.glob("tcp-stream-*.bin"))
lines = ["needle\tneedle_len\tstream\toffset"]
for name, needle in needles:
hit_count = 0
for stream_path in stream_paths:
data = stream_path.read_bytes()
offset = data.find(needle)
if offset >= 0:
lines.append(f"{name}\t{len(needle)}\t{stream_path.name}\t{offset}")
hit_count += 1
if hit_count == 0:
lines.append(f"{name}\t{len(needle)}\t\t-1")
output = args.capture_dir / "frida-to-tcp-map.tsv"
output.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f"wrote {output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())