from __future__ import annotations import argparse import csv import re import uuid from dataclasses import dataclass from pathlib import Path import pefile PROXY_VTBL_LIST = 0x10007CC8 STUB_VTBL_LIST = 0x10007CE4 NAME_LIST = 0x10007D00 INTERFACE_COUNT = 6 KNOWN_METHODS = { "INmxService2": [ "RegisterEngine", "UnRegisterEngine", "Connect", "TransferData", "AddSubscriberEngine", "RemoveSubscriberEngine", "SetHeartbeatSendInterval", "RegisterEngine2", "GetPartnerVersion", ], "INmxSvcStatistics": [ "GetNmxSvcStatistics", "ResetSvcStatistics", ], "INmxStatus": [ "OPENCONNECTION", "CloseConnection", "GetConnectionStatus", ], "INmxService": [ "RegisterEngine", "UnRegisterEngine", "Connect", "TransferData", "AddSubscriberEngine", "RemoveSubscriberEngine", "SetHeartbeatSendInterval", ], "INmxNotify": [ "ConnectionEstablished", "ConnectionClosed", ], "INmxSvcCallback": [ "DataReceived", "StatusReceived", ], } FC_TYPES = { 0x02: "FC_CHAR", 0x03: "FC_SMALL", 0x04: "FC_USMALL", 0x05: "FC_WCHAR", 0x06: "FC_SHORT", 0x07: "FC_USHORT", 0x08: "FC_LONG", 0x09: "FC_ULONG", 0x0A: "FC_FLOAT", 0x0B: "FC_HYPER", 0x0C: "FC_DOUBLE", 0x0D: "FC_ENUM16", 0x0E: "FC_ENUM32", 0x10: "FC_ERROR_STATUS_T", 0x11: "FC_RP", 0x12: "FC_UP", 0x14: "FC_FP", 0x15: "FC_STRUCT", 0x1B: "FC_CARRAY", 0x2F: "FC_IP", 0x36: "FC_BYTE_COUNT_POINTER", 0x46: "FC_NO_REPEAT", 0x4C: "FC_EMBEDDED_COMPLEX", 0x5A: "FC_CONSTANT_IID", 0x5B: "FC_END", 0x5C: "FC_PAD", } PARAM_FLAGS = [ (0x0001, "must_size"), (0x0002, "must_free"), (0x0008, "in"), (0x0010, "out"), (0x0020, "return"), (0x0040, "base_type"), (0x0080, "by_value"), (0x0100, "simple_ref"), ] def u16(data: bytes, offset: int) -> int: return int.from_bytes(data[offset:offset + 2], "little") def u32(data: bytes, offset: int) -> int: return int.from_bytes(data[offset:offset + 4], "little") class PeView: def __init__(self, path: Path): self.pe = pefile.PE(str(path)) self.data = bytes(self.pe.__data__) self.base = self.pe.OPTIONAL_HEADER.ImageBase def offset(self, va: int) -> int: return self.pe.get_offset_from_rva(va - self.base) def slice(self, va: int, length: int) -> bytes: offset = self.offset(va) return self.data[offset:offset + length] def u16(self, va: int) -> int: return u16(self.data, self.offset(va)) def u32(self, va: int) -> int: return u32(self.data, self.offset(va)) def guid(self, va: int) -> str: return str(uuid.UUID(bytes_le=self.slice(va, 16))).upper() def asciiz(self, va: int) -> str: start = self.offset(va) end = self.data.index(b"\x00", start) return self.data[start:end].decode("ascii") @dataclass(frozen=True) class InterfaceInfo: index: int name: str iid: str method_count: int user_method_count: int proxy_info_va: int stub_desc_va: int proc_format_va: int offset_table_va: int type_format_va: int def parse_interface(pe: PeView, index: int) -> InterfaceInfo: name_va = pe.u32(NAME_LIST + index * 4) name = pe.asciiz(name_va) proxy_vtbl = pe.u32(PROXY_VTBL_LIST + index * 4) stub_vtbl = pe.u32(STUB_VTBL_LIST + index * 4) proxy_info = pe.u32(proxy_vtbl) iid_va = pe.u32(proxy_vtbl + 4) method_count = pe.u32(stub_vtbl + 8) stub_desc = pe.u32(proxy_info) proc_format = pe.u32(proxy_info + 4) offset_table = pe.u32(proxy_info + 8) type_format = pe.u32(stub_desc + 0x20) return InterfaceInfo( index=index, name=name, iid=pe.guid(iid_va), method_count=method_count, user_method_count=max(0, method_count - 3), proxy_info_va=proxy_info, stub_desc_va=stub_desc, proc_format_va=proc_format, offset_table_va=offset_table, type_format_va=type_format, ) def hex_bytes(data: bytes) -> str: return data.hex(" ") def safe_name(value: str) -> str: return re.sub(r"[^A-Za-z0-9_.-]+", "_", value) def flags_text(value: int) -> str: names = [name for bit, name in PARAM_FLAGS if value & bit] remainder = value & ~sum(bit for bit, _ in PARAM_FLAGS) if remainder: names.append(f"0x{remainder:04x}") return "|".join(names) if names else "none" def type_text(type_or_offset: int, type_format_va: int, pe: PeView, is_base_type: bool) -> str: if is_base_type: return FC_TYPES.get(type_or_offset, f"FC_0x{type_or_offset:02x}") type_va = type_format_va + type_or_offset raw = pe.slice(type_va, 18) annotated = [] for byte in raw: annotated.append(FC_TYPES.get(byte, f"0x{byte:02x}")) return f"type+0x{type_or_offset:04x} @{type_va:#010x} [{', '.join(annotated)}]" def parse_proc(pe: PeView, interface: InterfaceInfo, method_index: int, method_name: str, proc_offset: int) -> dict[str, object]: va = interface.proc_format_va + proc_offset header = pe.slice(va, 24) param_count = header[15] params = [] pos = 24 for index in range(param_count): desc = pe.slice(va + pos, 6) flags = u16(desc, 0) stack_offset = u16(desc, 2) type_or_offset = u16(desc, 4) is_base_type = bool(flags & 0x0040) params.append( f"p{index}:flags=0x{flags:04x}({flags_text(flags)})," f"stack={stack_offset}," f"type={type_text(type_or_offset, interface.type_format_va, pe, is_base_type)}" ) pos += 6 return { "interface_index": interface.index, "interface_name": interface.name, "iid": interface.iid, "method_index": method_index, "method_name": method_name, "proc_offset": f"0x{proc_offset:04x}", "proc_va": f"0x{va:08x}", "opnum": u16(header, 6), "x86_stack_size": u16(header, 8), "client_buffer_size": u16(header, 10), "server_buffer_size": u16(header, 12), "proc_flags": f"0x{header[14]:02x}", "param_count": param_count, "oi2_flags": f"0x{header[16]:02x}", "oi2_ext_flags": f"0x{header[17]:02x}", "raw_header": hex_bytes(header), "params": "; ".join(params), } def main() -> int: parser = argparse.ArgumentParser() parser.add_argument( "--dll", type=Path, default=Path(r"C:\Program Files (x86)\ArchestrA\Framework\Bin\NmxSvcps.dll"), ) parser.add_argument("--out", type=Path, default=Path("analysis/proxy/nmxsvcps-procedures.tsv")) parser.add_argument("--type-dir", type=Path, default=Path("analysis/proxy/type-format-snippets")) args = parser.parse_args() pe = PeView(args.dll) interfaces = [parse_interface(pe, i) for i in range(INTERFACE_COUNT)] args.out.parent.mkdir(parents=True, exist_ok=True) args.type_dir.mkdir(parents=True, exist_ok=True) fieldnames = [ "interface_index", "interface_name", "iid", "method_index", "method_name", "proc_offset", "proc_va", "opnum", "x86_stack_size", "client_buffer_size", "server_buffer_size", "proc_flags", "param_count", "oi2_flags", "oi2_ext_flags", "raw_header", "params", ] with args.out.open("w", encoding="utf-8", newline="") as handle: writer = csv.DictWriter(handle, delimiter="\t", fieldnames=fieldnames, lineterminator="\n") writer.writeheader() for interface in interfaces: names = KNOWN_METHODS[interface.name] offsets = [pe.u16(interface.offset_table_va + i * 2) for i in range(interface.method_count)] user_offsets = offsets[3:3 + interface.user_method_count] if len(names) != len(user_offsets): raise ValueError(f"{interface.name} method names do not match recovered offset count") for method_index, (method_name, proc_offset) in enumerate(zip(names, user_offsets), start=3): writer.writerow(parse_proc(pe, interface, method_index, method_name, proc_offset)) type_snippet = pe.slice(interface.type_format_va, 0x98) (args.type_dir / f"{interface.index}-{safe_name(interface.name)}.txt").write_text( "\n".join([ f"{interface.name}", f"iid={interface.iid}", f"stub_desc=0x{interface.stub_desc_va:08x}", f"proc_format=0x{interface.proc_format_va:08x}", f"offset_table=0x{interface.offset_table_va:08x}", f"type_format=0x{interface.type_format_va:08x}", f"type_bytes={hex_bytes(type_snippet)}", "", ]), encoding="utf-8", ) print(f"wrote {args.out}") print(f"wrote type snippets under {args.type_dir}") return 0 if __name__ == "__main__": raise SystemExit(main())