Files
mxaccess/analysis/scripts/extract_nmxsvcps_proc_formats.py
T
Joseph Doherty fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Initial project state: .NET reference, design, Rust port (M0+M1), evidence
Layout:
- src/                    .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
                          MxAsbClient, probes, tests, harnesses. Executable spec.
- design/                 Architectural plan for the Rust port (M0–M6), error
                          model, protocol invariants, risks (R1–R16), adversarial
                          review log (review.md).
- rust/                   Rust workspace. M0 skeleton + M1 codec parity.
                          mxaccess-codec: 215 unit tests + 2 cross-implementation
                          parity tests (byte-identical against .NET reference).
                          Other crates are M0 stubs awaiting M2+.
- captures/               Frida + netsh + pcap evidence per CLAUDE.md
                          ("captures are evidence, not throwaway logs").
- analysis/               Decompiled C# (frida/proxy/decompiled-*),
                          Ghidra exports for native DLLs (`exports/` only —
                          working state at `projects/` and AVEVA's input
                          binaries at `input/` are gitignored).
- docs/                   Reverse-engineering reference docs.
- tools/                  Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
                          Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/      Rust CI: fmt + build + test + clippy on Windows.
- LICENSE                 MIT (Joseph Doherty, 2026).

Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly

Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 06:21:00 -04:00

315 lines
9.1 KiB
Python

from __future__ import annotations
import argparse
import csv
import re
import uuid
from dataclasses import dataclass
from pathlib import Path
import pefile
PROXY_VTBL_LIST = 0x10007CC8
STUB_VTBL_LIST = 0x10007CE4
NAME_LIST = 0x10007D00
INTERFACE_COUNT = 6
KNOWN_METHODS = {
"INmxService2": [
"RegisterEngine",
"UnRegisterEngine",
"Connect",
"TransferData",
"AddSubscriberEngine",
"RemoveSubscriberEngine",
"SetHeartbeatSendInterval",
"RegisterEngine2",
"GetPartnerVersion",
],
"INmxSvcStatistics": [
"GetNmxSvcStatistics",
"ResetSvcStatistics",
],
"INmxStatus": [
"OPENCONNECTION",
"CloseConnection",
"GetConnectionStatus",
],
"INmxService": [
"RegisterEngine",
"UnRegisterEngine",
"Connect",
"TransferData",
"AddSubscriberEngine",
"RemoveSubscriberEngine",
"SetHeartbeatSendInterval",
],
"INmxNotify": [
"ConnectionEstablished",
"ConnectionClosed",
],
"INmxSvcCallback": [
"DataReceived",
"StatusReceived",
],
}
FC_TYPES = {
0x02: "FC_CHAR",
0x03: "FC_SMALL",
0x04: "FC_USMALL",
0x05: "FC_WCHAR",
0x06: "FC_SHORT",
0x07: "FC_USHORT",
0x08: "FC_LONG",
0x09: "FC_ULONG",
0x0A: "FC_FLOAT",
0x0B: "FC_HYPER",
0x0C: "FC_DOUBLE",
0x0D: "FC_ENUM16",
0x0E: "FC_ENUM32",
0x10: "FC_ERROR_STATUS_T",
0x11: "FC_RP",
0x12: "FC_UP",
0x14: "FC_FP",
0x15: "FC_STRUCT",
0x1B: "FC_CARRAY",
0x2F: "FC_IP",
0x36: "FC_BYTE_COUNT_POINTER",
0x46: "FC_NO_REPEAT",
0x4C: "FC_EMBEDDED_COMPLEX",
0x5A: "FC_CONSTANT_IID",
0x5B: "FC_END",
0x5C: "FC_PAD",
}
PARAM_FLAGS = [
(0x0001, "must_size"),
(0x0002, "must_free"),
(0x0008, "in"),
(0x0010, "out"),
(0x0020, "return"),
(0x0040, "base_type"),
(0x0080, "by_value"),
(0x0100, "simple_ref"),
]
def u16(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset:offset + 2], "little")
def u32(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset:offset + 4], "little")
class PeView:
def __init__(self, path: Path):
self.pe = pefile.PE(str(path))
self.data = bytes(self.pe.__data__)
self.base = self.pe.OPTIONAL_HEADER.ImageBase
def offset(self, va: int) -> int:
return self.pe.get_offset_from_rva(va - self.base)
def slice(self, va: int, length: int) -> bytes:
offset = self.offset(va)
return self.data[offset:offset + length]
def u16(self, va: int) -> int:
return u16(self.data, self.offset(va))
def u32(self, va: int) -> int:
return u32(self.data, self.offset(va))
def guid(self, va: int) -> str:
return str(uuid.UUID(bytes_le=self.slice(va, 16))).upper()
def asciiz(self, va: int) -> str:
start = self.offset(va)
end = self.data.index(b"\x00", start)
return self.data[start:end].decode("ascii")
@dataclass(frozen=True)
class InterfaceInfo:
index: int
name: str
iid: str
method_count: int
user_method_count: int
proxy_info_va: int
stub_desc_va: int
proc_format_va: int
offset_table_va: int
type_format_va: int
def parse_interface(pe: PeView, index: int) -> InterfaceInfo:
name_va = pe.u32(NAME_LIST + index * 4)
name = pe.asciiz(name_va)
proxy_vtbl = pe.u32(PROXY_VTBL_LIST + index * 4)
stub_vtbl = pe.u32(STUB_VTBL_LIST + index * 4)
proxy_info = pe.u32(proxy_vtbl)
iid_va = pe.u32(proxy_vtbl + 4)
method_count = pe.u32(stub_vtbl + 8)
stub_desc = pe.u32(proxy_info)
proc_format = pe.u32(proxy_info + 4)
offset_table = pe.u32(proxy_info + 8)
type_format = pe.u32(stub_desc + 0x20)
return InterfaceInfo(
index=index,
name=name,
iid=pe.guid(iid_va),
method_count=method_count,
user_method_count=max(0, method_count - 3),
proxy_info_va=proxy_info,
stub_desc_va=stub_desc,
proc_format_va=proc_format,
offset_table_va=offset_table,
type_format_va=type_format,
)
def hex_bytes(data: bytes) -> str:
return data.hex(" ")
def safe_name(value: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]+", "_", value)
def flags_text(value: int) -> str:
names = [name for bit, name in PARAM_FLAGS if value & bit]
remainder = value & ~sum(bit for bit, _ in PARAM_FLAGS)
if remainder:
names.append(f"0x{remainder:04x}")
return "|".join(names) if names else "none"
def type_text(type_or_offset: int, type_format_va: int, pe: PeView, is_base_type: bool) -> str:
if is_base_type:
return FC_TYPES.get(type_or_offset, f"FC_0x{type_or_offset:02x}")
type_va = type_format_va + type_or_offset
raw = pe.slice(type_va, 18)
annotated = []
for byte in raw:
annotated.append(FC_TYPES.get(byte, f"0x{byte:02x}"))
return f"type+0x{type_or_offset:04x} @{type_va:#010x} [{', '.join(annotated)}]"
def parse_proc(pe: PeView, interface: InterfaceInfo, method_index: int, method_name: str, proc_offset: int) -> dict[str, object]:
va = interface.proc_format_va + proc_offset
header = pe.slice(va, 24)
param_count = header[15]
params = []
pos = 24
for index in range(param_count):
desc = pe.slice(va + pos, 6)
flags = u16(desc, 0)
stack_offset = u16(desc, 2)
type_or_offset = u16(desc, 4)
is_base_type = bool(flags & 0x0040)
params.append(
f"p{index}:flags=0x{flags:04x}({flags_text(flags)}),"
f"stack={stack_offset},"
f"type={type_text(type_or_offset, interface.type_format_va, pe, is_base_type)}"
)
pos += 6
return {
"interface_index": interface.index,
"interface_name": interface.name,
"iid": interface.iid,
"method_index": method_index,
"method_name": method_name,
"proc_offset": f"0x{proc_offset:04x}",
"proc_va": f"0x{va:08x}",
"opnum": u16(header, 6),
"x86_stack_size": u16(header, 8),
"client_buffer_size": u16(header, 10),
"server_buffer_size": u16(header, 12),
"proc_flags": f"0x{header[14]:02x}",
"param_count": param_count,
"oi2_flags": f"0x{header[16]:02x}",
"oi2_ext_flags": f"0x{header[17]:02x}",
"raw_header": hex_bytes(header),
"params": "; ".join(params),
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"--dll",
type=Path,
default=Path(r"C:\Program Files (x86)\ArchestrA\Framework\Bin\NmxSvcps.dll"),
)
parser.add_argument("--out", type=Path, default=Path("analysis/proxy/nmxsvcps-procedures.tsv"))
parser.add_argument("--type-dir", type=Path, default=Path("analysis/proxy/type-format-snippets"))
args = parser.parse_args()
pe = PeView(args.dll)
interfaces = [parse_interface(pe, i) for i in range(INTERFACE_COUNT)]
args.out.parent.mkdir(parents=True, exist_ok=True)
args.type_dir.mkdir(parents=True, exist_ok=True)
fieldnames = [
"interface_index",
"interface_name",
"iid",
"method_index",
"method_name",
"proc_offset",
"proc_va",
"opnum",
"x86_stack_size",
"client_buffer_size",
"server_buffer_size",
"proc_flags",
"param_count",
"oi2_flags",
"oi2_ext_flags",
"raw_header",
"params",
]
with args.out.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, delimiter="\t", fieldnames=fieldnames, lineterminator="\n")
writer.writeheader()
for interface in interfaces:
names = KNOWN_METHODS[interface.name]
offsets = [pe.u16(interface.offset_table_va + i * 2) for i in range(interface.method_count)]
user_offsets = offsets[3:3 + interface.user_method_count]
if len(names) != len(user_offsets):
raise ValueError(f"{interface.name} method names do not match recovered offset count")
for method_index, (method_name, proc_offset) in enumerate(zip(names, user_offsets), start=3):
writer.writerow(parse_proc(pe, interface, method_index, method_name, proc_offset))
type_snippet = pe.slice(interface.type_format_va, 0x98)
(args.type_dir / f"{interface.index}-{safe_name(interface.name)}.txt").write_text(
"\n".join([
f"{interface.name}",
f"iid={interface.iid}",
f"stub_desc=0x{interface.stub_desc_va:08x}",
f"proc_format=0x{interface.proc_format_va:08x}",
f"offset_table=0x{interface.offset_table_va:08x}",
f"type_format=0x{interface.type_format_va:08x}",
f"type_bytes={hex_bytes(type_snippet)}",
"",
]),
encoding="utf-8",
)
print(f"wrote {args.out}")
print(f"wrote type snippets under {args.type_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())