from __future__ import annotations import argparse import csv from dataclasses import dataclass from pathlib import Path import uuid PTYPE_NAMES = { 0: "request", 2: "response", 3: "fault", 11: "bind", 12: "bind_ack", 14: "alter_context", 15: "alter_context_resp", } @dataclass class DceRpcPdu: stream: str offset: int ptype: int flags: int frag_len: int auth_len: int call_id: int alloc_hint: int | None context_id: int | None opnum: int | None stub_offset: int | None stub_len: int stub: bytes context_interface_uuid: str = "" context_interface_version: str = "" bind_contexts: str = "" def u16le(data: bytes, offset: int) -> int: return int.from_bytes(data[offset:offset + 2], "little") def u32le(data: bytes, offset: int) -> int: return int.from_bytes(data[offset:offset + 4], "little") def guid_le(data: bytes, offset: int) -> str: return str(uuid.UUID(bytes_le=data[offset:offset + 16])).upper() def looks_like_header(data: bytes, offset: int) -> bool: if offset + 16 > len(data): return False if data[offset] != 5 or data[offset + 1] != 0: return False ptype = data[offset + 2] if ptype not in PTYPE_NAMES: return False if data[offset + 4:offset + 8] != b"\x10\x00\x00\x00": return False frag_len = u16le(data, offset + 8) auth_len = u16le(data, offset + 10) if frag_len < 16 or offset + frag_len > len(data): return False if auth_len > frag_len: return False return True def parse_pdu(stream: str, data: bytes, offset: int) -> DceRpcPdu: ptype = data[offset + 2] flags = data[offset + 3] frag_len = u16le(data, offset + 8) auth_len = u16le(data, offset + 10) call_id = u32le(data, offset + 12) alloc_hint = None context_id = None opnum = None stub_offset = None stub_len = 0 stub = b"" if ptype == 0 and frag_len >= 24: alloc_hint = u32le(data, offset + 16) context_id = u16le(data, offset + 20) opnum = u16le(data, offset + 22) stub_offset = offset + 24 elif ptype in (2, 3) and frag_len >= 24: alloc_hint = u32le(data, offset + 16) context_id = u16le(data, offset + 20) stub_offset = offset + 24 if stub_offset is not None: auth_pad_and_trailer = auth_len + (8 if auth_len else 0) pdu_end = offset + frag_len - auth_pad_and_trailer if pdu_end >= stub_offset: stub = data[stub_offset:pdu_end] stub_len = len(stub) return DceRpcPdu( stream=stream, offset=offset, ptype=ptype, flags=flags, frag_len=frag_len, auth_len=auth_len, call_id=call_id, alloc_hint=alloc_hint, context_id=context_id, opnum=opnum, stub_offset=stub_offset, stub_len=stub_len, stub=stub, ) def parse_bind_contexts(data: bytes, offset: int, frag_len: int) -> dict[int, tuple[str, str]]: contexts: dict[int, tuple[str, str]] = {} if frag_len < 28: return contexts count = data[offset + 24] cursor = offset + 28 end = offset + frag_len for _ in range(count): if cursor + 24 > end: break context_id = u16le(data, cursor) transfer_count = data[cursor + 2] cursor += 4 abstract_uuid = guid_le(data, cursor) version = f"{u16le(data, cursor + 16)}.{u16le(data, cursor + 18)}" cursor += 20 cursor += transfer_count * 20 contexts[context_id] = (abstract_uuid, version) return contexts def parse_stream(stream_path: Path) -> list[DceRpcPdu]: data = stream_path.read_bytes() rows: list[DceRpcPdu] = [] contexts: dict[int, tuple[str, str]] = {} offset = 0 while offset < len(data): if looks_like_header(data, offset): pdu = parse_pdu(stream_path.name, data, offset) if pdu.ptype in (11, 14): parsed_contexts = parse_bind_contexts(data, offset, pdu.frag_len) contexts.update(parsed_contexts) pdu.bind_contexts = ";".join( f"{context_id}:{iid}:{version}" for context_id, (iid, version) in sorted(parsed_contexts.items()) ) if pdu.context_id is not None and pdu.context_id in contexts: iid, version = contexts[pdu.context_id] pdu.context_interface_uuid = iid pdu.context_interface_version = version rows.append(pdu) offset += pdu.frag_len continue offset += 1 return rows def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("capture_dir", type=Path) parser.add_argument("--scalar-int", type=int) args = parser.parse_args() paths = sorted(args.capture_dir.glob("tcp-stream-*49704*.bin")) rows: list[DceRpcPdu] = [] for path in paths: rows.extend(parse_stream(path)) scalar = args.scalar_int.to_bytes(4, "little", signed=True) if args.scalar_int is not None else None out_path = args.capture_dir / "dcerpc-stream-pdus.tsv" with out_path.open("w", encoding="utf-8", newline="") as handle: writer = csv.writer(handle, delimiter="\t", lineterminator="\n") writer.writerow([ "stream", "offset", "ptype", "ptype_name", "flags", "frag_len", "auth_len", "call_id", "alloc_hint", "context_id", "context_interface_uuid", "context_interface_version", "opnum", "stub_offset", "stub_len", "scalar_hit_offsets", "bind_contexts", "stub_hex_prefix", ]) for row in rows: hits: list[str] = [] if scalar and row.stub: start = 0 while True: hit = row.stub.find(scalar, start) if hit < 0: break hits.append(str(hit)) start = hit + 1 writer.writerow([ row.stream, row.offset, row.ptype, PTYPE_NAMES.get(row.ptype, str(row.ptype)), f"0x{row.flags:02x}", row.frag_len, row.auth_len, row.call_id, "" if row.alloc_hint is None else row.alloc_hint, "" if row.context_id is None else row.context_id, row.context_interface_uuid, row.context_interface_version, "" if row.opnum is None else row.opnum, "" if row.stub_offset is None else row.stub_offset, row.stub_len, ",".join(hits), row.bind_contexts, row.stub[:64].hex(" "), ]) print(f"wrote {out_path} pdus={len(rows)}") return 0 if __name__ == "__main__": raise SystemExit(main())