Files
mxaccess/analysis/scripts/inspect_pe_rpc.py
T
Joseph Doherty fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Initial project state: .NET reference, design, Rust port (M0+M1), evidence
Layout:
- src/                    .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
                          MxAsbClient, probes, tests, harnesses. Executable spec.
- design/                 Architectural plan for the Rust port (M0–M6), error
                          model, protocol invariants, risks (R1–R16), adversarial
                          review log (review.md).
- rust/                   Rust workspace. M0 skeleton + M1 codec parity.
                          mxaccess-codec: 215 unit tests + 2 cross-implementation
                          parity tests (byte-identical against .NET reference).
                          Other crates are M0 stubs awaiting M2+.
- captures/               Frida + netsh + pcap evidence per CLAUDE.md
                          ("captures are evidence, not throwaway logs").
- analysis/               Decompiled C# (frida/proxy/decompiled-*),
                          Ghidra exports for native DLLs (`exports/` only —
                          working state at `projects/` and AVEVA's input
                          binaries at `input/` are gitignored).
- docs/                   Reverse-engineering reference docs.
- tools/                  Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
                          Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/      Rust CI: fmt + build + test + clippy on Windows.
- LICENSE                 MIT (Joseph Doherty, 2026).

Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly

Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 06:21:00 -04:00

212 lines
6.3 KiB
Python

from __future__ import annotations
import argparse
import datetime as dt
import re
import uuid
from pathlib import Path
import pefile
ASCII_RE = re.compile(rb"[\x20-\x7e]{5,}")
UTF16_RE = re.compile(rb"(?:[\x20-\x7e]\x00){5,}")
def read_c_string(data: bytes, offset: int, limit: int = 128) -> str:
chunk = data[offset:offset + limit]
end = chunk.find(b"\x00")
if end >= 0:
chunk = chunk[:end]
return chunk.decode("ascii", errors="replace")
def timestamp_text(value: int) -> str:
try:
return dt.datetime.fromtimestamp(value, tz=dt.timezone.utc).isoformat()
except Exception:
return str(value)
def hex_context(data: bytes, offset: int, size: int) -> str:
start = max(0, offset - size)
end = min(len(data), offset + size)
return data[start:end].hex(" ")
def collect_strings(data: bytes, needles: list[str]) -> list[str]:
lowered = [needle.lower() for needle in needles]
matches: list[str] = []
for raw in ASCII_RE.findall(data):
text = raw.decode("ascii", errors="replace")
if any(needle in text.lower() for needle in lowered):
matches.append(text)
for raw in UTF16_RE.findall(data):
text = raw.decode("utf-16le", errors="replace")
if any(needle in text.lower() for needle in lowered):
matches.append(text)
return sorted(set(matches))
def format_exports(pe: pefile.PE) -> list[str]:
if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
return ["(none)"]
rows = ["| Ordinal | RVA | Name |", "| ---: | ---: | --- |"]
for symbol in pe.DIRECTORY_ENTRY_EXPORT.symbols:
name = symbol.name.decode("utf-8", errors="replace") if symbol.name else ""
rows.append(f"| {symbol.ordinal} | 0x{symbol.address:08x} | `{name}` |")
return rows
def format_imports(pe: pefile.PE) -> list[str]:
if not hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
return ["(none)"]
lines: list[str] = []
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dll = entry.dll.decode("utf-8", errors="replace")
names: list[str] = []
for imp in entry.imports:
if imp.name:
names.append(imp.name.decode("utf-8", errors="replace"))
else:
names.append(f"ord_{imp.ordinal}")
lines.append(f"- `{dll}`: " + ", ".join(f"`{name}`" for name in names))
return lines
def format_resources(pe: pefile.PE) -> list[str]:
if not hasattr(pe, "DIRECTORY_ENTRY_RESOURCE"):
return ["(none)"]
rows = ["| Type | ID/name | Lang | RVA | Size |", "| --- | --- | ---: | ---: | ---: |"]
for type_entry in pe.DIRECTORY_ENTRY_RESOURCE.entries:
type_name = str(type_entry.name) if type_entry.name is not None else str(type_entry.struct.Id)
if not hasattr(type_entry, "directory"):
continue
for name_entry in type_entry.directory.entries:
name = str(name_entry.name) if name_entry.name is not None else str(name_entry.struct.Id)
if not hasattr(name_entry, "directory"):
continue
for lang_entry in name_entry.directory.entries:
data = lang_entry.data.struct
rows.append(
f"| `{type_name}` | `{name}` | {lang_entry.struct.Id} | "
f"0x{data.OffsetToData:08x} | {data.Size} |"
)
return rows
def guid_hits(data: bytes, guid_text: str) -> list[str]:
value = uuid.UUID(guid_text)
patterns = [
("text-lower", str(value).encode("ascii")),
("text-upper", str(value).upper().encode("ascii")),
("utf16-lower", str(value).encode("utf-16le")),
("utf16-upper", str(value).upper().encode("utf-16le")),
("guid-bytes-le", value.bytes_le),
("guid-bytes-be", value.bytes),
]
lines: list[str] = []
for label, pattern in patterns:
start = 0
while True:
offset = data.find(pattern, start)
if offset < 0:
break
lines.append(
f"- `{guid_text}` `{label}` at file offset `0x{offset:08x}`: "
f"`{hex_context(data, offset, 32)}`"
)
start = offset + 1
return lines
def report_one(path: Path, guids: list[str]) -> str:
data = path.read_bytes()
pe = pefile.PE(str(path), fast_load=False)
pe.parse_data_directories()
machine = pe.FILE_HEADER.Machine
bitness = "x64" if machine == 0x8664 else "x86" if machine == 0x14c else f"machine 0x{machine:04x}"
lines = [
f"# {path.name}",
"",
f"- Path: `{path}`",
f"- Size: {len(data)} bytes",
f"- Machine: {bitness}",
f"- PE timestamp: {timestamp_text(pe.FILE_HEADER.TimeDateStamp)}",
f"- ImageBase: `0x{pe.OPTIONAL_HEADER.ImageBase:08x}`",
"",
"## Exports",
"",
*format_exports(pe),
"",
"## Imports",
"",
*format_imports(pe),
"",
"## Resources",
"",
*format_resources(pe),
"",
"## GUID hits",
"",
]
hit_lines: list[str] = []
for guid in guids:
hit_lines.extend(guid_hits(data, guid))
lines.extend(hit_lines or ["(none)"])
needles = [
"ndr",
"proxy",
"stub",
"rpc",
"interface",
"nmx",
"lmx",
"putrequest",
"getresponse",
"registeritems",
"write",
*guids,
]
strings = collect_strings(data, needles)
lines.extend(["", "## Interesting strings", ""])
lines.extend(f"- `{text}`" for text in strings[:200])
if len(strings) > 200:
lines.append(f"- ... {len(strings) - 200} more")
return "\n".join(lines) + "\n"
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("pe", type=Path, nargs="+")
parser.add_argument("--guid", action="append", default=[])
parser.add_argument("--out-dir", type=Path)
args = parser.parse_args()
if args.out_dir:
args.out_dir.mkdir(parents=True, exist_ok=True)
for path in args.pe:
report = report_one(path, args.guid)
if args.out_dir:
(args.out_dir / f"{path.name}.md").write_text(report, encoding="utf-8")
else:
print(report)
return 0
if __name__ == "__main__":
raise SystemExit(main())