fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Layout:
- src/ .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
MxAsbClient, probes, tests, harnesses. Executable spec.
- design/ Architectural plan for the Rust port (M0–M6), error
model, protocol invariants, risks (R1–R16), adversarial
review log (review.md).
- rust/ Rust workspace. M0 skeleton + M1 codec parity.
mxaccess-codec: 215 unit tests + 2 cross-implementation
parity tests (byte-identical against .NET reference).
Other crates are M0 stubs awaiting M2+.
- captures/ Frida + netsh + pcap evidence per CLAUDE.md
("captures are evidence, not throwaway logs").
- analysis/ Decompiled C# (frida/proxy/decompiled-*),
Ghidra exports for native DLLs (`exports/` only —
working state at `projects/` and AVEVA's input
binaries at `input/` are gitignored).
- docs/ Reverse-engineering reference docs.
- tools/ Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/ Rust CI: fmt + build + test + clippy on Windows.
- LICENSE MIT (Joseph Doherty, 2026).
Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly
Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
124 lines
4.0 KiB
Python
124 lines
4.0 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import struct
|
|
from pathlib import Path
|
|
|
|
|
|
ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
|
|
|
|
|
|
def load_frida_events(path: Path) -> list[dict]:
|
|
events: list[dict] = []
|
|
for raw in path.read_text(errors="replace").splitlines():
|
|
line = ANSI_RE.sub("", raw).strip()
|
|
index = line.find("{")
|
|
if index < 0:
|
|
continue
|
|
|
|
try:
|
|
outer = json.loads(line[index:])
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
payload = outer.get("payload") if outer.get("type") == "send" else outer
|
|
if isinstance(payload, str):
|
|
try:
|
|
payload = json.loads(payload)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if isinstance(payload, dict) and payload.get("event"):
|
|
events.append(payload)
|
|
|
|
return events
|
|
|
|
|
|
def first_candidate(event: dict, min_size: int = 20, max_size: int = 4096) -> bytes | None:
|
|
for candidate in event.get("candidates") or []:
|
|
hex_text = candidate.get("hex") or ""
|
|
size = int(candidate.get("size") or 0)
|
|
if hex_text and min_size <= size <= max_size:
|
|
return bytes.fromhex(hex_text)
|
|
return None
|
|
|
|
|
|
def extract_needles(events: list[dict], scalar_int: int | None) -> list[tuple[str, bytes]]:
|
|
start = None
|
|
for index, event in enumerate(events):
|
|
if event.get("name", "").startswith("CLMXProxyServer.Write") and event.get("args"):
|
|
start = index
|
|
break
|
|
|
|
needles: list[tuple[str, bytes]] = []
|
|
if scalar_int is not None:
|
|
needles.append((f"int32:{scalar_int}", struct.pack("<i", scalar_int)))
|
|
|
|
if start is None:
|
|
return needles
|
|
|
|
for event in events[start + 1:]:
|
|
name = event.get("name")
|
|
if name not in ("CNmxAdapter.PutRequest", "CNmxAdapter.TransferData", "CNmxAdapter.ProcessDataReceived"):
|
|
continue
|
|
|
|
body = first_candidate(event)
|
|
if body is None:
|
|
continue
|
|
|
|
if name == "CNmxAdapter.PutRequest" and body.startswith(b"\x37\x01"):
|
|
needles.append(("putrequest-body", body))
|
|
elif name == "CNmxAdapter.TransferData" and (scalar_int is None or struct.pack("<i", scalar_int) in body):
|
|
needles.append(("transferdata-body", body))
|
|
elif name == "CNmxAdapter.ProcessDataReceived" and (scalar_int is None or struct.pack("<i", scalar_int) in body):
|
|
needles.append(("callback-body", body))
|
|
|
|
found_names = {name for name, _ in needles}
|
|
if {"putrequest-body", "transferdata-body", "callback-body"}.issubset(found_names):
|
|
break
|
|
|
|
return needles
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("capture_dir", type=Path)
|
|
parser.add_argument("--scalar-int", type=int)
|
|
parser.add_argument("--frida-log", type=Path)
|
|
args = parser.parse_args()
|
|
|
|
frida_log = args.frida_log
|
|
if frida_log is None:
|
|
candidates = [
|
|
args.capture_dir / "frida.stdout.jsonl",
|
|
args.capture_dir / "client-frida.stdout.jsonl",
|
|
]
|
|
frida_log = next((path for path in candidates if path.exists()), candidates[0])
|
|
|
|
events = load_frida_events(frida_log)
|
|
needles = extract_needles(events, args.scalar_int)
|
|
stream_paths = sorted(args.capture_dir.glob("tcp-stream-*.bin"))
|
|
|
|
lines = ["needle\tneedle_len\tstream\toffset"]
|
|
for name, needle in needles:
|
|
hit_count = 0
|
|
for stream_path in stream_paths:
|
|
data = stream_path.read_bytes()
|
|
offset = data.find(needle)
|
|
if offset >= 0:
|
|
lines.append(f"{name}\t{len(needle)}\t{stream_path.name}\t{offset}")
|
|
hit_count += 1
|
|
if hit_count == 0:
|
|
lines.append(f"{name}\t{len(needle)}\t\t-1")
|
|
|
|
output = args.capture_dir / "frida-to-tcp-map.tsv"
|
|
output.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
print(f"wrote {output}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|