fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Layout:
- src/ .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
MxAsbClient, probes, tests, harnesses. Executable spec.
- design/ Architectural plan for the Rust port (M0–M6), error
model, protocol invariants, risks (R1–R16), adversarial
review log (review.md).
- rust/ Rust workspace. M0 skeleton + M1 codec parity.
mxaccess-codec: 215 unit tests + 2 cross-implementation
parity tests (byte-identical against .NET reference).
Other crates are M0 stubs awaiting M2+.
- captures/ Frida + netsh + pcap evidence per CLAUDE.md
("captures are evidence, not throwaway logs").
- analysis/ Decompiled C# (frida/proxy/decompiled-*),
Ghidra exports for native DLLs (`exports/` only —
working state at `projects/` and AVEVA's input
binaries at `input/` are gitignored).
- docs/ Reverse-engineering reference docs.
- tools/ Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/ Rust CI: fmt + build + test + clippy on Windows.
- LICENSE MIT (Joseph Doherty, 2026).
Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly
Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
315 lines
9.1 KiB
Python
315 lines
9.1 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import re
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
import pefile
|
|
|
|
|
|
PROXY_VTBL_LIST = 0x10007CC8
|
|
STUB_VTBL_LIST = 0x10007CE4
|
|
NAME_LIST = 0x10007D00
|
|
INTERFACE_COUNT = 6
|
|
|
|
KNOWN_METHODS = {
|
|
"INmxService2": [
|
|
"RegisterEngine",
|
|
"UnRegisterEngine",
|
|
"Connect",
|
|
"TransferData",
|
|
"AddSubscriberEngine",
|
|
"RemoveSubscriberEngine",
|
|
"SetHeartbeatSendInterval",
|
|
"RegisterEngine2",
|
|
"GetPartnerVersion",
|
|
],
|
|
"INmxSvcStatistics": [
|
|
"GetNmxSvcStatistics",
|
|
"ResetSvcStatistics",
|
|
],
|
|
"INmxStatus": [
|
|
"OPENCONNECTION",
|
|
"CloseConnection",
|
|
"GetConnectionStatus",
|
|
],
|
|
"INmxService": [
|
|
"RegisterEngine",
|
|
"UnRegisterEngine",
|
|
"Connect",
|
|
"TransferData",
|
|
"AddSubscriberEngine",
|
|
"RemoveSubscriberEngine",
|
|
"SetHeartbeatSendInterval",
|
|
],
|
|
"INmxNotify": [
|
|
"ConnectionEstablished",
|
|
"ConnectionClosed",
|
|
],
|
|
"INmxSvcCallback": [
|
|
"DataReceived",
|
|
"StatusReceived",
|
|
],
|
|
}
|
|
|
|
FC_TYPES = {
|
|
0x02: "FC_CHAR",
|
|
0x03: "FC_SMALL",
|
|
0x04: "FC_USMALL",
|
|
0x05: "FC_WCHAR",
|
|
0x06: "FC_SHORT",
|
|
0x07: "FC_USHORT",
|
|
0x08: "FC_LONG",
|
|
0x09: "FC_ULONG",
|
|
0x0A: "FC_FLOAT",
|
|
0x0B: "FC_HYPER",
|
|
0x0C: "FC_DOUBLE",
|
|
0x0D: "FC_ENUM16",
|
|
0x0E: "FC_ENUM32",
|
|
0x10: "FC_ERROR_STATUS_T",
|
|
0x11: "FC_RP",
|
|
0x12: "FC_UP",
|
|
0x14: "FC_FP",
|
|
0x15: "FC_STRUCT",
|
|
0x1B: "FC_CARRAY",
|
|
0x2F: "FC_IP",
|
|
0x36: "FC_BYTE_COUNT_POINTER",
|
|
0x46: "FC_NO_REPEAT",
|
|
0x4C: "FC_EMBEDDED_COMPLEX",
|
|
0x5A: "FC_CONSTANT_IID",
|
|
0x5B: "FC_END",
|
|
0x5C: "FC_PAD",
|
|
}
|
|
|
|
PARAM_FLAGS = [
|
|
(0x0001, "must_size"),
|
|
(0x0002, "must_free"),
|
|
(0x0008, "in"),
|
|
(0x0010, "out"),
|
|
(0x0020, "return"),
|
|
(0x0040, "base_type"),
|
|
(0x0080, "by_value"),
|
|
(0x0100, "simple_ref"),
|
|
]
|
|
|
|
|
|
def u16(data: bytes, offset: int) -> int:
|
|
return int.from_bytes(data[offset:offset + 2], "little")
|
|
|
|
|
|
def u32(data: bytes, offset: int) -> int:
|
|
return int.from_bytes(data[offset:offset + 4], "little")
|
|
|
|
|
|
class PeView:
|
|
def __init__(self, path: Path):
|
|
self.pe = pefile.PE(str(path))
|
|
self.data = bytes(self.pe.__data__)
|
|
self.base = self.pe.OPTIONAL_HEADER.ImageBase
|
|
|
|
def offset(self, va: int) -> int:
|
|
return self.pe.get_offset_from_rva(va - self.base)
|
|
|
|
def slice(self, va: int, length: int) -> bytes:
|
|
offset = self.offset(va)
|
|
return self.data[offset:offset + length]
|
|
|
|
def u16(self, va: int) -> int:
|
|
return u16(self.data, self.offset(va))
|
|
|
|
def u32(self, va: int) -> int:
|
|
return u32(self.data, self.offset(va))
|
|
|
|
def guid(self, va: int) -> str:
|
|
return str(uuid.UUID(bytes_le=self.slice(va, 16))).upper()
|
|
|
|
def asciiz(self, va: int) -> str:
|
|
start = self.offset(va)
|
|
end = self.data.index(b"\x00", start)
|
|
return self.data[start:end].decode("ascii")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class InterfaceInfo:
|
|
index: int
|
|
name: str
|
|
iid: str
|
|
method_count: int
|
|
user_method_count: int
|
|
proxy_info_va: int
|
|
stub_desc_va: int
|
|
proc_format_va: int
|
|
offset_table_va: int
|
|
type_format_va: int
|
|
|
|
|
|
def parse_interface(pe: PeView, index: int) -> InterfaceInfo:
|
|
name_va = pe.u32(NAME_LIST + index * 4)
|
|
name = pe.asciiz(name_va)
|
|
proxy_vtbl = pe.u32(PROXY_VTBL_LIST + index * 4)
|
|
stub_vtbl = pe.u32(STUB_VTBL_LIST + index * 4)
|
|
proxy_info = pe.u32(proxy_vtbl)
|
|
iid_va = pe.u32(proxy_vtbl + 4)
|
|
method_count = pe.u32(stub_vtbl + 8)
|
|
stub_desc = pe.u32(proxy_info)
|
|
proc_format = pe.u32(proxy_info + 4)
|
|
offset_table = pe.u32(proxy_info + 8)
|
|
type_format = pe.u32(stub_desc + 0x20)
|
|
|
|
return InterfaceInfo(
|
|
index=index,
|
|
name=name,
|
|
iid=pe.guid(iid_va),
|
|
method_count=method_count,
|
|
user_method_count=max(0, method_count - 3),
|
|
proxy_info_va=proxy_info,
|
|
stub_desc_va=stub_desc,
|
|
proc_format_va=proc_format,
|
|
offset_table_va=offset_table,
|
|
type_format_va=type_format,
|
|
)
|
|
|
|
|
|
def hex_bytes(data: bytes) -> str:
|
|
return data.hex(" ")
|
|
|
|
|
|
def safe_name(value: str) -> str:
|
|
return re.sub(r"[^A-Za-z0-9_.-]+", "_", value)
|
|
|
|
|
|
def flags_text(value: int) -> str:
|
|
names = [name for bit, name in PARAM_FLAGS if value & bit]
|
|
remainder = value & ~sum(bit for bit, _ in PARAM_FLAGS)
|
|
if remainder:
|
|
names.append(f"0x{remainder:04x}")
|
|
return "|".join(names) if names else "none"
|
|
|
|
|
|
def type_text(type_or_offset: int, type_format_va: int, pe: PeView, is_base_type: bool) -> str:
|
|
if is_base_type:
|
|
return FC_TYPES.get(type_or_offset, f"FC_0x{type_or_offset:02x}")
|
|
|
|
type_va = type_format_va + type_or_offset
|
|
raw = pe.slice(type_va, 18)
|
|
annotated = []
|
|
for byte in raw:
|
|
annotated.append(FC_TYPES.get(byte, f"0x{byte:02x}"))
|
|
return f"type+0x{type_or_offset:04x} @{type_va:#010x} [{', '.join(annotated)}]"
|
|
|
|
|
|
def parse_proc(pe: PeView, interface: InterfaceInfo, method_index: int, method_name: str, proc_offset: int) -> dict[str, object]:
|
|
va = interface.proc_format_va + proc_offset
|
|
header = pe.slice(va, 24)
|
|
param_count = header[15]
|
|
params = []
|
|
pos = 24
|
|
for index in range(param_count):
|
|
desc = pe.slice(va + pos, 6)
|
|
flags = u16(desc, 0)
|
|
stack_offset = u16(desc, 2)
|
|
type_or_offset = u16(desc, 4)
|
|
is_base_type = bool(flags & 0x0040)
|
|
params.append(
|
|
f"p{index}:flags=0x{flags:04x}({flags_text(flags)}),"
|
|
f"stack={stack_offset},"
|
|
f"type={type_text(type_or_offset, interface.type_format_va, pe, is_base_type)}"
|
|
)
|
|
pos += 6
|
|
|
|
return {
|
|
"interface_index": interface.index,
|
|
"interface_name": interface.name,
|
|
"iid": interface.iid,
|
|
"method_index": method_index,
|
|
"method_name": method_name,
|
|
"proc_offset": f"0x{proc_offset:04x}",
|
|
"proc_va": f"0x{va:08x}",
|
|
"opnum": u16(header, 6),
|
|
"x86_stack_size": u16(header, 8),
|
|
"client_buffer_size": u16(header, 10),
|
|
"server_buffer_size": u16(header, 12),
|
|
"proc_flags": f"0x{header[14]:02x}",
|
|
"param_count": param_count,
|
|
"oi2_flags": f"0x{header[16]:02x}",
|
|
"oi2_ext_flags": f"0x{header[17]:02x}",
|
|
"raw_header": hex_bytes(header),
|
|
"params": "; ".join(params),
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
"--dll",
|
|
type=Path,
|
|
default=Path(r"C:\Program Files (x86)\ArchestrA\Framework\Bin\NmxSvcps.dll"),
|
|
)
|
|
parser.add_argument("--out", type=Path, default=Path("analysis/proxy/nmxsvcps-procedures.tsv"))
|
|
parser.add_argument("--type-dir", type=Path, default=Path("analysis/proxy/type-format-snippets"))
|
|
args = parser.parse_args()
|
|
|
|
pe = PeView(args.dll)
|
|
interfaces = [parse_interface(pe, i) for i in range(INTERFACE_COUNT)]
|
|
args.out.parent.mkdir(parents=True, exist_ok=True)
|
|
args.type_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
fieldnames = [
|
|
"interface_index",
|
|
"interface_name",
|
|
"iid",
|
|
"method_index",
|
|
"method_name",
|
|
"proc_offset",
|
|
"proc_va",
|
|
"opnum",
|
|
"x86_stack_size",
|
|
"client_buffer_size",
|
|
"server_buffer_size",
|
|
"proc_flags",
|
|
"param_count",
|
|
"oi2_flags",
|
|
"oi2_ext_flags",
|
|
"raw_header",
|
|
"params",
|
|
]
|
|
|
|
with args.out.open("w", encoding="utf-8", newline="") as handle:
|
|
writer = csv.DictWriter(handle, delimiter="\t", fieldnames=fieldnames, lineterminator="\n")
|
|
writer.writeheader()
|
|
for interface in interfaces:
|
|
names = KNOWN_METHODS[interface.name]
|
|
offsets = [pe.u16(interface.offset_table_va + i * 2) for i in range(interface.method_count)]
|
|
user_offsets = offsets[3:3 + interface.user_method_count]
|
|
if len(names) != len(user_offsets):
|
|
raise ValueError(f"{interface.name} method names do not match recovered offset count")
|
|
|
|
for method_index, (method_name, proc_offset) in enumerate(zip(names, user_offsets), start=3):
|
|
writer.writerow(parse_proc(pe, interface, method_index, method_name, proc_offset))
|
|
|
|
type_snippet = pe.slice(interface.type_format_va, 0x98)
|
|
(args.type_dir / f"{interface.index}-{safe_name(interface.name)}.txt").write_text(
|
|
"\n".join([
|
|
f"{interface.name}",
|
|
f"iid={interface.iid}",
|
|
f"stub_desc=0x{interface.stub_desc_va:08x}",
|
|
f"proc_format=0x{interface.proc_format_va:08x}",
|
|
f"offset_table=0x{interface.offset_table_va:08x}",
|
|
f"type_format=0x{interface.type_format_va:08x}",
|
|
f"type_bytes={hex_bytes(type_snippet)}",
|
|
"",
|
|
]),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
print(f"wrote {args.out}")
|
|
print(f"wrote type snippets under {args.type_dir}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|