Files
mxaccess/analysis/scripts/parse_dcerpc_streams.py
T
Joseph Doherty fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Initial project state: .NET reference, design, Rust port (M0+M1), evidence
Layout:
- src/                    .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
                          MxAsbClient, probes, tests, harnesses. Executable spec.
- design/                 Architectural plan for the Rust port (M0–M6), error
                          model, protocol invariants, risks (R1–R16), adversarial
                          review log (review.md).
- rust/                   Rust workspace. M0 skeleton + M1 codec parity.
                          mxaccess-codec: 215 unit tests + 2 cross-implementation
                          parity tests (byte-identical against .NET reference).
                          Other crates are M0 stubs awaiting M2+.
- captures/               Frida + netsh + pcap evidence per CLAUDE.md
                          ("captures are evidence, not throwaway logs").
- analysis/               Decompiled C# (frida/proxy/decompiled-*),
                          Ghidra exports for native DLLs (`exports/` only —
                          working state at `projects/` and AVEVA's input
                          binaries at `input/` are gitignored).
- docs/                   Reverse-engineering reference docs.
- tools/                  Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
                          Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/      Rust CI: fmt + build + test + clippy on Windows.
- LICENSE                 MIT (Joseph Doherty, 2026).

Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly

Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 06:21:00 -04:00

242 lines
7.0 KiB
Python

from __future__ import annotations
import argparse
import csv
from dataclasses import dataclass
from pathlib import Path
import uuid
PTYPE_NAMES = {
0: "request",
2: "response",
3: "fault",
11: "bind",
12: "bind_ack",
14: "alter_context",
15: "alter_context_resp",
}
@dataclass
class DceRpcPdu:
stream: str
offset: int
ptype: int
flags: int
frag_len: int
auth_len: int
call_id: int
alloc_hint: int | None
context_id: int | None
opnum: int | None
stub_offset: int | None
stub_len: int
stub: bytes
context_interface_uuid: str = ""
context_interface_version: str = ""
bind_contexts: str = ""
def u16le(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset:offset + 2], "little")
def u32le(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset:offset + 4], "little")
def guid_le(data: bytes, offset: int) -> str:
return str(uuid.UUID(bytes_le=data[offset:offset + 16])).upper()
def looks_like_header(data: bytes, offset: int) -> bool:
if offset + 16 > len(data):
return False
if data[offset] != 5 or data[offset + 1] != 0:
return False
ptype = data[offset + 2]
if ptype not in PTYPE_NAMES:
return False
if data[offset + 4:offset + 8] != b"\x10\x00\x00\x00":
return False
frag_len = u16le(data, offset + 8)
auth_len = u16le(data, offset + 10)
if frag_len < 16 or offset + frag_len > len(data):
return False
if auth_len > frag_len:
return False
return True
def parse_pdu(stream: str, data: bytes, offset: int) -> DceRpcPdu:
ptype = data[offset + 2]
flags = data[offset + 3]
frag_len = u16le(data, offset + 8)
auth_len = u16le(data, offset + 10)
call_id = u32le(data, offset + 12)
alloc_hint = None
context_id = None
opnum = None
stub_offset = None
stub_len = 0
stub = b""
if ptype == 0 and frag_len >= 24:
alloc_hint = u32le(data, offset + 16)
context_id = u16le(data, offset + 20)
opnum = u16le(data, offset + 22)
stub_offset = offset + 24
elif ptype in (2, 3) and frag_len >= 24:
alloc_hint = u32le(data, offset + 16)
context_id = u16le(data, offset + 20)
stub_offset = offset + 24
if stub_offset is not None:
auth_pad_and_trailer = auth_len + (8 if auth_len else 0)
pdu_end = offset + frag_len - auth_pad_and_trailer
if pdu_end >= stub_offset:
stub = data[stub_offset:pdu_end]
stub_len = len(stub)
return DceRpcPdu(
stream=stream,
offset=offset,
ptype=ptype,
flags=flags,
frag_len=frag_len,
auth_len=auth_len,
call_id=call_id,
alloc_hint=alloc_hint,
context_id=context_id,
opnum=opnum,
stub_offset=stub_offset,
stub_len=stub_len,
stub=stub,
)
def parse_bind_contexts(data: bytes, offset: int, frag_len: int) -> dict[int, tuple[str, str]]:
contexts: dict[int, tuple[str, str]] = {}
if frag_len < 28:
return contexts
count = data[offset + 24]
cursor = offset + 28
end = offset + frag_len
for _ in range(count):
if cursor + 24 > end:
break
context_id = u16le(data, cursor)
transfer_count = data[cursor + 2]
cursor += 4
abstract_uuid = guid_le(data, cursor)
version = f"{u16le(data, cursor + 16)}.{u16le(data, cursor + 18)}"
cursor += 20
cursor += transfer_count * 20
contexts[context_id] = (abstract_uuid, version)
return contexts
def parse_stream(stream_path: Path) -> list[DceRpcPdu]:
data = stream_path.read_bytes()
rows: list[DceRpcPdu] = []
contexts: dict[int, tuple[str, str]] = {}
offset = 0
while offset < len(data):
if looks_like_header(data, offset):
pdu = parse_pdu(stream_path.name, data, offset)
if pdu.ptype in (11, 14):
parsed_contexts = parse_bind_contexts(data, offset, pdu.frag_len)
contexts.update(parsed_contexts)
pdu.bind_contexts = ";".join(
f"{context_id}:{iid}:{version}"
for context_id, (iid, version) in sorted(parsed_contexts.items())
)
if pdu.context_id is not None and pdu.context_id in contexts:
iid, version = contexts[pdu.context_id]
pdu.context_interface_uuid = iid
pdu.context_interface_version = version
rows.append(pdu)
offset += pdu.frag_len
continue
offset += 1
return rows
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("capture_dir", type=Path)
parser.add_argument("--scalar-int", type=int)
args = parser.parse_args()
paths = sorted(args.capture_dir.glob("tcp-stream-*49704*.bin"))
rows: list[DceRpcPdu] = []
for path in paths:
rows.extend(parse_stream(path))
scalar = args.scalar_int.to_bytes(4, "little", signed=True) if args.scalar_int is not None else None
out_path = args.capture_dir / "dcerpc-stream-pdus.tsv"
with out_path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.writer(handle, delimiter="\t", lineterminator="\n")
writer.writerow([
"stream",
"offset",
"ptype",
"ptype_name",
"flags",
"frag_len",
"auth_len",
"call_id",
"alloc_hint",
"context_id",
"context_interface_uuid",
"context_interface_version",
"opnum",
"stub_offset",
"stub_len",
"scalar_hit_offsets",
"bind_contexts",
"stub_hex_prefix",
])
for row in rows:
hits: list[str] = []
if scalar and row.stub:
start = 0
while True:
hit = row.stub.find(scalar, start)
if hit < 0:
break
hits.append(str(hit))
start = hit + 1
writer.writerow([
row.stream,
row.offset,
row.ptype,
PTYPE_NAMES.get(row.ptype, str(row.ptype)),
f"0x{row.flags:02x}",
row.frag_len,
row.auth_len,
row.call_id,
"" if row.alloc_hint is None else row.alloc_hint,
"" if row.context_id is None else row.context_id,
row.context_interface_uuid,
row.context_interface_version,
"" if row.opnum is None else row.opnum,
"" if row.stub_offset is None else row.stub_offset,
row.stub_len,
",".join(hits),
row.bind_contexts,
row.stub[:64].hex(" "),
])
print(f"wrote {out_path} pdus={len(rows)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())