Initial project state: .NET reference, design, Rust port (M0+M1), evidence
rust / build / test / clippy / fmt (push) Has been cancelled

Layout:
- src/                    .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
                          MxAsbClient, probes, tests, harnesses. Executable spec.
- design/                 Architectural plan for the Rust port (M0–M6), error
                          model, protocol invariants, risks (R1–R16), adversarial
                          review log (review.md).
- rust/                   Rust workspace. M0 skeleton + M1 codec parity.
                          mxaccess-codec: 215 unit tests + 2 cross-implementation
                          parity tests (byte-identical against .NET reference).
                          Other crates are M0 stubs awaiting M2+.
- captures/               Frida + netsh + pcap evidence per CLAUDE.md
                          ("captures are evidence, not throwaway logs").
- analysis/               Decompiled C# (frida/proxy/decompiled-*),
                          Ghidra exports for native DLLs (`exports/` only —
                          working state at `projects/` and AVEVA's input
                          binaries at `input/` are gitignored).
- docs/                   Reverse-engineering reference docs.
- tools/                  Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
                          Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/      Rust CI: fmt + build + test + clippy on Windows.
- LICENSE                 MIT (Joseph Doherty, 2026).

Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly

Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 06:21:00 -04:00
parent 43733699b0
commit fe2a6db786
3849 changed files with 352975 additions and 0 deletions
+289
View File
@@ -0,0 +1,289 @@
from __future__ import annotations
import argparse
import csv
import datetime as dt
import json
import re
import struct
from dataclasses import dataclass
from pathlib import Path
from scapy.all import IP, IPv6, Raw, TCP, rdpcap
EVENT_RE = re.compile(r"^(?P<timestamp>\S+)\t(?P<event>[^\t]+)\t(?P<payload>.*)$")
@dataclass(frozen=True)
class Endpoint:
host: str
port: int
@classmethod
def parse(cls, text: str) -> "Endpoint":
host, port = text.rsplit(":", 1)
return cls(host, int(port))
def parse_timestamp(text: str) -> dt.datetime:
normalized = text.replace("Z", "+00:00")
parsed = dt.datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.timezone.utc)
return parsed.astimezone(dt.timezone.utc)
def harness_events(path: Path) -> list[dict[str, object]]:
events: list[dict[str, object]] = []
for line in path.read_text(encoding="utf-8").splitlines():
match = EVENT_RE.match(line)
if not match:
continue
try:
payload = json.loads(match.group("payload"))
except json.JSONDecodeError:
payload = {}
events.append({
"timestamp": parse_timestamp(match.group("timestamp")),
"event": match.group("event"),
"payload": payload,
})
return events
def find_event(events: list[dict[str, object]], name: str) -> dict[str, object]:
for event in events:
if event["event"] == name:
return event
raise RuntimeError(f"Event {name!r} was not found.")
def find_events(events: list[dict[str, object]], name: str) -> list[dict[str, object]]:
return [event for event in events if event["event"] == name]
def packet_hosts(packet) -> tuple[str, str] | None:
if IP in packet:
return str(packet[IP].src), str(packet[IP].dst)
if IPv6 in packet:
return str(packet[IPv6].src), str(packet[IPv6].dst)
return None
def i32(data: bytes, offset: int) -> int | None:
if offset + 4 > len(data):
return None
return struct.unpack_from("<i", data, offset)[0]
def u32(data: bytes, offset: int) -> int | None:
if offset + 4 > len(data):
return None
return struct.unpack_from("<I", data, offset)[0]
def ascii_preview(data: bytes, limit: int = 48) -> str:
return "".join(chr(value) if 32 <= value <= 126 else "." for value in data[:limit])
def announced_data_records_match(data: bytes, offset: int, announced_size: int) -> bool:
if announced_size < 0:
return False
total = 0
cursor = offset + 12
while total < announced_size and cursor + 4 <= len(data):
record_length = u32(data, cursor)
if record_length is None or record_length > 1024 * 1024:
return False
record_size = record_length + 4
if record_size <= 4 or cursor + record_size > len(data):
return False
total += record_size
cursor += record_size
return total == announced_size
def classify_control(data: bytes, offset: int) -> str | None:
first = i32(data, offset)
second = i32(data, offset + 4)
third = i32(data, offset + 8)
if first is None or second is None or third is None:
return None
if first in {-1, -2}:
return "control"
if third != 0 or second < 0:
return None
if announced_data_records_match(data, offset, first):
return "control_announce"
if offset + 12 == len(data):
return "control_announce"
return None
def iter_records(payload: bytes) -> list[tuple[str, int, bytes]]:
records: list[tuple[str, int, bytes]] = []
offset = 0
while offset < len(payload):
first = i32(payload, offset)
if first is None:
break
control_type = classify_control(payload, offset)
if control_type is not None:
record = payload[offset:offset + 12]
records.append((control_type, offset, record))
offset += 12
continue
length = u32(payload, offset)
if length is not None and length <= 1024 * 1024 and offset + 4 + length <= len(payload):
record = payload[offset:offset + 4 + length]
records.append(("data", offset, record))
offset += 4 + length
continue
records.append(("unknown", offset, payload[offset:]))
break
return records
def record_body(record_type: str, record: bytes) -> bytes:
if record_type == "data":
return record[4:]
return record
def write_rows(capture_dir: Path, endpoint_a: Endpoint, endpoint_b: Endpoint, before: float, after: float, out: Path) -> None:
events = harness_events(capture_dir / "harness.log")
write_begins = find_events(events, "mx.write.begin")
write_completes = find_events(events, "mx.event.write-complete")
if not write_begins:
raise RuntimeError("Event 'mx.write.begin' was not found.")
rows: list[dict[str, str]] = []
packets = list(enumerate(rdpcap(str(capture_dir / "loopback.pcapng")), start=1))
complete_cursor = 0
for ordinal, write_begin in enumerate(write_begins):
write_time = write_begin["timestamp"]
assert isinstance(write_time, dt.datetime)
while complete_cursor < len(write_completes):
candidate_time = write_completes[complete_cursor]["timestamp"]
assert isinstance(candidate_time, dt.datetime)
if candidate_time >= write_time:
break
complete_cursor += 1
if complete_cursor < len(write_completes):
complete_time = write_completes[complete_cursor]["timestamp"]
complete_cursor += 1
else:
complete_time = write_time
assert isinstance(complete_time, dt.datetime)
write_payload = write_begin.get("payload", {})
write_index = str(write_payload.get("WriteIndex", ordinal)) if isinstance(write_payload, dict) else str(ordinal)
write_value = ""
if isinstance(write_payload, dict) and isinstance(write_payload.get("Value"), dict):
value_payload = write_payload["Value"]
write_value = str(value_payload.get("Value", ""))
start_epoch = write_time.timestamp() - before
end_epoch = write_time.timestamp() + after
for frame, packet in packets:
if TCP not in packet or Raw not in packet:
continue
packet_time = float(packet.time)
if packet_time < start_epoch or packet_time > end_epoch:
continue
hosts = packet_hosts(packet)
if hosts is None:
continue
tcp = packet[TCP]
src = Endpoint(hosts[0], int(tcp.sport))
dst = Endpoint(hosts[1], int(tcp.dport))
if {src, dst} != {endpoint_a, endpoint_b}:
continue
direction = "a_to_b" if src == endpoint_a else "b_to_a"
payload = bytes(packet[Raw].load)
for record_index, (record_type, payload_offset, record) in enumerate(iter_records(payload)):
body = record_body(record_type, record)
rows.append({
"capture": capture_dir.name,
"write_index": write_index,
"write_value": write_value,
"frame": str(frame),
"packet_time_relative_to_write": f"{packet_time - write_time.timestamp():.9f}",
"packet_time_relative_to_complete": f"{packet_time - complete_time.timestamp():.9f}",
"direction": direction,
"src": f"{src.host}:{src.port}",
"dst": f"{dst.host}:{dst.port}",
"tcp_seq": str(int(tcp.seq)),
"payload_offset": str(payload_offset),
"record_index": str(record_index),
"record_type": record_type,
"record_size": str(len(record)),
"announced_length": "" if record_type != "data" else str(len(body)),
"i32_0": "" if (v := i32(body, 0)) is None else str(v),
"i32_1": "" if (v := i32(body, 4)) is None else str(v),
"i32_2": "" if (v := i32(body, 8)) is None else str(v),
"i32_3": "" if (v := i32(body, 12)) is None else str(v),
"signature16": body[:16].hex(" "),
"signature24": body[:24].hex(" "),
"hex": body.hex(" "),
"ascii_preview": ascii_preview(body),
})
out.parent.mkdir(parents=True, exist_ok=True)
header = [
"capture",
"write_index",
"write_value",
"frame",
"packet_time_relative_to_write",
"packet_time_relative_to_complete",
"direction",
"src",
"dst",
"tcp_seq",
"payload_offset",
"record_index",
"record_type",
"record_size",
"announced_length",
"i32_0",
"i32_1",
"i32_2",
"i32_3",
"signature16",
"signature24",
"hex",
"ascii_preview",
]
with out.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n")
writer.writeheader()
writer.writerows(rows)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("capture_dir", type=Path)
parser.add_argument("--a", default="127.0.0.1:57415")
parser.add_argument("--b", default="127.0.0.1:57433")
parser.add_argument("--before", type=float, default=0.35)
parser.add_argument("--after", type=float, default=0.75)
parser.add_argument("--out", type=Path)
args = parser.parse_args()
out = args.out or (args.capture_dir / "write-window-mixed-records.tsv")
write_rows(args.capture_dir, Endpoint.parse(args.a), Endpoint.parse(args.b), args.before, args.after, out)
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,127 @@
from __future__ import annotations
import argparse
import csv
import struct
from pathlib import Path
def read_i32(data: bytes, offset: int) -> int | None:
if offset + 4 > len(data):
return None
return struct.unpack_from("<i", data, offset)[0]
def read_u32(data: bytes, offset: int) -> int | None:
if offset + 4 > len(data):
return None
return struct.unpack_from("<I", data, offset)[0]
def ascii_preview(data: bytes, limit: int = 48) -> str:
chars = []
for value in data[:limit]:
chars.append(chr(value) if 32 <= value <= 126 else ".")
return "".join(chars)
def parse_frames(data: bytes, max_frame_size: int) -> list[dict[str, str]]:
frames: list[dict[str, str]] = []
offset = 0
index = 0
while offset + 4 <= len(data):
length = read_u32(data, offset)
assert length is not None
body_offset = offset + 4
end = body_offset + length
complete = end <= len(data)
plausible = 0 <= length <= max_frame_size
if not plausible:
frames.append({
"index": str(index),
"offset": f"0x{offset:08x}",
"length": str(length),
"complete": "0",
"i32_0": "",
"i32_1": "",
"i32_2": "",
"i32_3": "",
"u32_0_hex": "",
"u32_1_hex": "",
"hex_prefix": data[offset:offset + 32].hex(" "),
"ascii_preview": ascii_preview(data[offset:offset + 32]),
"note": "invalid length",
})
break
body = data[body_offset:min(end, len(data))]
values = [read_i32(body, i * 4) for i in range(4)]
uvalues = [read_u32(body, i * 4) for i in range(2)]
frames.append({
"index": str(index),
"offset": f"0x{offset:08x}",
"length": str(length),
"complete": "1" if complete else "0",
"i32_0": "" if values[0] is None else str(values[0]),
"i32_1": "" if values[1] is None else str(values[1]),
"i32_2": "" if values[2] is None else str(values[2]),
"i32_3": "" if values[3] is None else str(values[3]),
"u32_0_hex": "" if uvalues[0] is None else f"0x{uvalues[0]:08x}",
"u32_1_hex": "" if uvalues[1] is None else f"0x{uvalues[1]:08x}",
"hex_prefix": body[:32].hex(" "),
"ascii_preview": ascii_preview(body),
"note": "",
})
index += 1
if not complete:
break
offset = end
return frames
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("stream", type=Path)
parser.add_argument("--out", type=Path)
parser.add_argument("--max-frame-size", type=int, default=1024 * 1024)
args = parser.parse_args()
data = args.stream.read_bytes()
rows = parse_frames(data, args.max_frame_size)
header = [
"index",
"offset",
"length",
"complete",
"i32_0",
"i32_1",
"i32_2",
"i32_3",
"u32_0_hex",
"u32_1_hex",
"hex_prefix",
"ascii_preview",
"note",
]
if args.out:
args.out.parent.mkdir(parents=True, exist_ok=True)
handle = args.out.open("w", encoding="utf-8", newline="")
else:
import sys
handle = sys.stdout
with handle:
writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n")
writer.writeheader()
writer.writerows(rows)
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,138 @@
from __future__ import annotations
import argparse
import csv
import struct
from pathlib import Path
def i32(data: bytes, offset: int) -> int | None:
if offset + 4 > len(data):
return None
return struct.unpack_from("<i", data, offset)[0]
def u32(data: bytes, offset: int) -> int | None:
if offset + 4 > len(data):
return None
return struct.unpack_from("<I", data, offset)[0]
def ascii_preview(data: bytes, limit: int = 48) -> str:
return "".join(chr(value) if 32 <= value <= 126 else "." for value in data[:limit])
def announced_data_records_match(data: bytes, offset: int, announced_size: int) -> bool:
if announced_size < 0:
return False
total = 0
cursor = offset + 12
while total < announced_size and cursor + 4 <= len(data):
record_length = u32(data, cursor)
if record_length is None or record_length > 1024 * 1024:
return False
record_size = record_length + 4
if record_size <= 4 or cursor + record_size > len(data):
return False
total += record_size
cursor += record_size
return total == announced_size
def looks_like_control(data: bytes, offset: int) -> bool:
first = i32(data, offset)
second = i32(data, offset + 4)
third = i32(data, offset + 8)
if first is None or second is None or third is None:
return False
if first in {-1, -2}:
return True
if third != 0 or second < 0:
return False
return announced_data_records_match(data, offset, first)
def parse(data: bytes, max_record_size: int) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
offset = 0
index = 0
while offset < len(data):
first = i32(data, offset)
if first is None:
break
if looks_like_control(data, offset):
record_type = "control"
size = 12
body = data[offset:offset + size]
elif first >= 0 and first <= max_record_size and offset + 4 + first <= len(data):
record_type = "data"
size = 4 + first
body = data[offset + 4:offset + size]
else:
record_type = "unknown"
size = min(32, len(data) - offset)
body = data[offset:offset + size]
rows.append({
"index": str(index),
"offset": f"0x{offset:08x}",
"record_type": record_type,
"record_size": str(size),
"first_i32": "" if first is None else str(first),
"second_i32": "" if (v := i32(data, offset + 4)) is None else str(v),
"third_i32": "" if (v := i32(data, offset + 8)) is None else str(v),
"body_i32_0": "" if (v := i32(body, 0)) is None else str(v),
"body_i32_1": "" if (v := i32(body, 4)) is None else str(v),
"body_i32_2": "" if (v := i32(body, 8)) is None else str(v),
"body_i32_3": "" if (v := i32(body, 12)) is None else str(v),
"hex_prefix": data[offset:offset + min(size, 80)].hex(" "),
"ascii_preview": ascii_preview(data[offset:offset + min(size, 80)]),
})
index += 1
if record_type == "unknown":
break
offset += size
return rows
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("stream", type=Path)
parser.add_argument("--out", type=Path, required=True)
parser.add_argument("--max-record-size", type=int, default=1024 * 1024)
args = parser.parse_args()
rows = parse(args.stream.read_bytes(), args.max_record_size)
header = [
"index",
"offset",
"record_type",
"record_size",
"first_i32",
"second_i32",
"third_i32",
"body_i32_0",
"body_i32_1",
"body_i32_2",
"body_i32_3",
"hex_prefix",
"ascii_preview",
]
args.out.parent.mkdir(parents=True, exist_ok=True)
with args.out.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n")
writer.writeheader()
writer.writerows(rows)
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,149 @@
from __future__ import annotations
import argparse
import csv
import struct
from dataclasses import dataclass
from pathlib import Path
from scapy.all import IP, IPv6, Raw, TCP, rdpcap
@dataclass(frozen=True)
class Endpoint:
host: str
port: int
@classmethod
def parse(cls, text: str) -> "Endpoint":
host, port = text.rsplit(":", 1)
return cls(host, int(port))
def packet_hosts(packet) -> tuple[str, str] | None:
if IP in packet:
return str(packet[IP].src), str(packet[IP].dst)
if IPv6 in packet:
return str(packet[IPv6].src), str(packet[IPv6].dst)
return None
def read_i32(data: bytes, offset: int) -> int | None:
if offset + 4 > len(data):
return None
return struct.unpack_from("<i", data, offset)[0]
def read_u32(data: bytes, offset: int) -> int | None:
if offset + 4 > len(data):
return None
return struct.unpack_from("<I", data, offset)[0]
def ascii_preview(data: bytes, limit: int = 48) -> str:
chars = []
for value in data[:limit]:
chars.append(chr(value) if 32 <= value <= 126 else ".")
return "".join(chars)
def decode_payload(payload: bytes) -> dict[str, str]:
first_i32 = read_i32(payload, 0)
second_i32 = read_i32(payload, 4)
third_i32 = read_i32(payload, 8)
length_match = first_i32 == len(payload) - 4 if first_i32 is not None else False
body = payload[4:] if length_match else payload
return {
"payload_len": str(len(payload)),
"first_i32": "" if first_i32 is None else str(first_i32),
"second_i32": "" if second_i32 is None else str(second_i32),
"third_i32": "" if third_i32 is None else str(third_i32),
"first_u32_hex": "" if (u := read_u32(payload, 0)) is None else f"0x{u:08x}",
"length_prefixed": "1" if length_match else "0",
"body_i32_0": "" if (v := read_i32(body, 0)) is None else str(v),
"body_i32_1": "" if (v := read_i32(body, 4)) is None else str(v),
"body_i32_2": "" if (v := read_i32(body, 8)) is None else str(v),
"body_i32_3": "" if (v := read_i32(body, 12)) is None else str(v),
"hex_prefix": payload[:64].hex(" "),
"ascii_preview": ascii_preview(payload),
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("pcapng", type=Path)
parser.add_argument("--a", required=True, help="Endpoint A, for example 127.0.0.1:57415")
parser.add_argument("--b", required=True, help="Endpoint B, for example 127.0.0.1:57433")
parser.add_argument("--out", type=Path, required=True)
args = parser.parse_args()
endpoint_a = Endpoint.parse(args.a)
endpoint_b = Endpoint.parse(args.b)
rows: list[dict[str, str]] = []
first_time: float | None = None
for frame, packet in enumerate(rdpcap(str(args.pcapng)), start=1):
if TCP not in packet or Raw not in packet:
continue
hosts = packet_hosts(packet)
if hosts is None:
continue
tcp = packet[TCP]
src = Endpoint(hosts[0], int(tcp.sport))
dst = Endpoint(hosts[1], int(tcp.dport))
if {src, dst} != {endpoint_a, endpoint_b}:
continue
payload = bytes(packet[Raw].load)
if not payload:
continue
packet_time = float(packet.time)
if first_time is None:
first_time = packet_time
decoded = decode_payload(payload)
decoded.update({
"frame": str(frame),
"time_relative": f"{packet_time - first_time:.9f}",
"direction": "a_to_b" if src == endpoint_a else "b_to_a",
"src": f"{src.host}:{src.port}",
"dst": f"{dst.host}:{dst.port}",
"seq": str(int(tcp.seq)),
})
rows.append(decoded)
header = [
"frame",
"time_relative",
"direction",
"src",
"dst",
"seq",
"payload_len",
"first_i32",
"second_i32",
"third_i32",
"first_u32_hex",
"length_prefixed",
"body_i32_0",
"body_i32_1",
"body_i32_2",
"body_i32_3",
"hex_prefix",
"ascii_preview",
]
args.out.parent.mkdir(parents=True, exist_ok=True)
with args.out.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n")
writer.writeheader()
writer.writerows(rows)
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,209 @@
from __future__ import annotations
import argparse
import csv
import struct
from collections import defaultdict
from pathlib import Path
def load_rows(path: Path) -> list[dict[str, str]]:
with path.open("r", encoding="utf-8", newline="") as handle:
return list(csv.DictReader(handle, delimiter="\t"))
def body_bytes(row: dict[str, str]) -> bytes:
return bytes.fromhex(row["hex"])
def ordinal_groups(rows: list[dict[str, str]]) -> dict[tuple[str, str, str], list[dict[str, str]]]:
groups: dict[tuple[str, str, str], list[dict[str, str]]] = defaultdict(list)
for row in rows:
key = (row["direction"], row["record_type"], row["record_size"])
groups[key].append(row)
return groups
def i32_at(data: bytes, offset: int) -> int | None:
if offset + 4 > len(data):
return None
return struct.unpack_from("<i", data, offset)[0]
def diff_offsets(a: bytes, b: bytes) -> list[int]:
limit = min(len(a), len(b))
offsets = [index for index in range(limit) if a[index] != b[index]]
offsets.extend(range(limit, max(len(a), len(b))))
return offsets
def byte_pairs(offsets: list[int], a: bytes, b: bytes, limit: int) -> str:
pairs: list[str] = []
for offset in offsets[:limit]:
left = f"{a[offset]:02x}" if offset < len(a) else "--"
right = f"{b[offset]:02x}" if offset < len(b) else "--"
pairs.append(f"{offset}:{left}->{right}")
return " ".join(pairs)
def i32_diffs(offsets: list[int], a: bytes, b: bytes) -> str:
touched_words = sorted({offset - (offset % 4) for offset in offsets})
parts: list[str] = []
for offset in touched_words:
left = i32_at(a, offset)
right = i32_at(b, offset)
if left is None or right is None or left == right:
continue
parts.append(f"{offset}:{left}->{right}")
return " ".join(parts)
def filter_rows(
rows: list[dict[str, str]],
write_index: str | None,
min_complete: float | None,
max_complete: float | None,
) -> list[dict[str, str]]:
out: list[dict[str, str]] = []
for row in rows:
if write_index is not None and row.get("write_index") != write_index:
continue
if min_complete is not None or max_complete is not None:
value = row.get("packet_time_relative_to_complete", "")
if value == "":
continue
relative = float(value)
if min_complete is not None and relative < min_complete:
continue
if max_complete is not None and relative > max_complete:
continue
out.append(row)
return out
def compare_rows(a_rows: list[dict[str, str]], b_rows: list[dict[str, str]], max_byte_pairs: int) -> list[dict[str, str]]:
a_groups = ordinal_groups(a_rows)
b_groups = ordinal_groups(b_rows)
out: list[dict[str, str]] = []
for key in sorted(set(a_groups) | set(b_groups)):
left_rows = a_groups.get(key, [])
right_rows = b_groups.get(key, [])
count = max(len(left_rows), len(right_rows))
for ordinal in range(count):
if ordinal >= len(left_rows) or ordinal >= len(right_rows):
out.append({
"write_a": "" if ordinal >= len(left_rows) else left_rows[ordinal].get("write_index", ""),
"write_value_a": "" if ordinal >= len(left_rows) else left_rows[ordinal].get("write_value", ""),
"write_b": "" if ordinal >= len(right_rows) else right_rows[ordinal].get("write_index", ""),
"write_value_b": "" if ordinal >= len(right_rows) else right_rows[ordinal].get("write_value", ""),
"direction": key[0],
"record_type": key[1],
"record_size": key[2],
"ordinal": str(ordinal),
"status": "missing_a" if ordinal >= len(left_rows) else "missing_b",
"frame_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["frame"],
"frame_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["frame"],
"time_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["packet_time_relative_to_write"],
"time_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["packet_time_relative_to_write"],
"signature16_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["signature16"],
"signature16_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["signature16"],
"bytes_differ": "",
"diff_offsets": "",
"byte_pairs": "",
"i32_diffs": "",
"ascii_a": "" if ordinal >= len(left_rows) else left_rows[ordinal]["ascii_preview"],
"ascii_b": "" if ordinal >= len(right_rows) else right_rows[ordinal]["ascii_preview"],
})
continue
left = left_rows[ordinal]
right = right_rows[ordinal]
left_body = body_bytes(left)
right_body = body_bytes(right)
offsets = diff_offsets(left_body, right_body)
out.append({
"write_a": left.get("write_index", ""),
"write_value_a": left.get("write_value", ""),
"write_b": right.get("write_index", ""),
"write_value_b": right.get("write_value", ""),
"direction": key[0],
"record_type": key[1],
"record_size": key[2],
"ordinal": str(ordinal),
"status": "same" if not offsets else "different",
"frame_a": left["frame"],
"frame_b": right["frame"],
"time_a": left["packet_time_relative_to_write"],
"time_b": right["packet_time_relative_to_write"],
"signature16_a": left["signature16"],
"signature16_b": right["signature16"],
"bytes_differ": str(len(offsets)),
"diff_offsets": " ".join(str(offset) for offset in offsets[:max_byte_pairs]),
"byte_pairs": byte_pairs(offsets, left_body, right_body, max_byte_pairs),
"i32_diffs": i32_diffs(offsets, left_body, right_body),
"ascii_a": left["ascii_preview"],
"ascii_b": right["ascii_preview"],
})
return out
def write_rows(rows: list[dict[str, str]], out: Path) -> None:
out.parent.mkdir(parents=True, exist_ok=True)
header = [
"write_a",
"write_value_a",
"write_b",
"write_value_b",
"direction",
"record_type",
"record_size",
"ordinal",
"status",
"frame_a",
"frame_b",
"time_a",
"time_b",
"signature16_a",
"signature16_b",
"bytes_differ",
"diff_offsets",
"byte_pairs",
"i32_diffs",
"ascii_a",
"ascii_b",
]
with out.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n")
writer.writeheader()
writer.writerows(rows)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("left", type=Path)
parser.add_argument("right", type=Path)
parser.add_argument("--out", type=Path, default=Path("analysis/network/write-window-body-diff.tsv"))
parser.add_argument("--left-write-index")
parser.add_argument("--right-write-index")
parser.add_argument("--time-complete-min", type=float)
parser.add_argument("--time-complete-max", type=float)
parser.add_argument("--max-byte-pairs", type=int, default=64)
args = parser.parse_args()
left_rows = filter_rows(load_rows(args.left), args.left_write_index, args.time_complete_min, args.time_complete_max)
right_rows = filter_rows(load_rows(args.right), args.right_write_index, args.time_complete_min, args.time_complete_max)
rows = compare_rows(left_rows, right_rows, args.max_byte_pairs)
write_rows(rows, args.out)
same = sum(1 for row in rows if row["status"] == "same")
different = sum(1 for row in rows if row["status"] == "different")
missing = len(rows) - same - different
print(f"wrote {args.out}")
print(f"same={same} different={different} missing={missing}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+200
View File
@@ -0,0 +1,200 @@
from __future__ import annotations
import argparse
import csv
import datetime as dt
import json
import re
import struct
from pathlib import Path
JSON_RE = re.compile(r"(\{.*\})")
HARNESS_RE = re.compile(r"^(?P<timestamp>\S+)\t(?P<event>[^\t]+)\t(?P<payload>.*)$")
def iter_json_events(path: Path):
for line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
match = JSON_RE.search(line)
if not match:
continue
try:
yield json.loads(match.group(1))
except json.JSONDecodeError:
continue
def harness_write_values(path: Path) -> tuple[str, list[str]]:
write_type = ""
values: list[str] = []
for line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
match = HARNESS_RE.match(line)
if not match:
continue
payload = json.loads(match.group("payload"))
if match.group("event") == "harness.start":
write_type = str(payload.get("WriteType", ""))
continue
if match.group("event") != "mx.write.begin":
continue
value = payload.get("Value", {}).get("Value")
if value is not None:
values.append(str(value))
return write_type.lower(), values
def bytes_from_hex(text: str) -> bytes:
if not text:
return b""
return bytes.fromhex(text)
def value_needles(write_type: str, values: list[str]) -> list[tuple[str, bytes]]:
needles: list[tuple[str, bytes]] = []
for value in values:
if write_type in {"int", "integer", "int32"}:
needles.append((value, struct.pack("<i", int(value))))
elif write_type in {"bool", "boolean"}:
normalized = value.lower()
if normalized == "true":
needles.append((value + ":u8", b"\x01"))
needles.append((value + ":i32", struct.pack("<i", 1)))
needles.append((value + ":variant_bool", struct.pack("<h", -1)))
else:
needles.append((value + ":u8", b"\x00"))
needles.append((value + ":i32", struct.pack("<i", 0)))
needles.append((value + ":variant_bool", struct.pack("<h", 0)))
elif write_type in {"float", "single"}:
needles.append((value + ":f32", struct.pack("<f", float(value))))
elif write_type == "double":
needles.append((value + ":f64", struct.pack("<d", float(value))))
elif write_type == "string":
needles.append((value + ":utf16le", value.encode("utf-16le")))
needles.append((value + ":utf8", value.encode("utf-8")))
elif write_type in {"datetime", "time"}:
parsed = parse_datetime(value)
if parsed.tzinfo is None:
local_parsed = parsed
parsed = parsed.replace(tzinfo=dt.datetime.now().astimezone().tzinfo)
else:
local_parsed = parsed.replace(tzinfo=None)
utc = parsed.astimezone(dt.timezone.utc)
ole_epoch = dt.datetime(1899, 12, 30, tzinfo=dt.timezone.utc)
ole_days = (utc - ole_epoch).total_seconds() / 86400.0
ole_local_epoch = dt.datetime(1899, 12, 30)
ole_local_days = (local_parsed - ole_local_epoch).total_seconds() / 86400.0
filetime_epoch = dt.datetime(1601, 1, 1, tzinfo=dt.timezone.utc)
filetime_ticks = int((utc - filetime_epoch).total_seconds() * 10_000_000)
needles.append((value + ":oadate", struct.pack("<d", ole_days)))
needles.append((value + ":oadate-local", struct.pack("<d", ole_local_days)))
needles.append((value + ":filetime", struct.pack("<Q", filetime_ticks)))
needles.append((value + ":utf16le", value.encode("utf-16le")))
date_texts = {
local_parsed.strftime("%m/%d/%Y %H:%M:%S"),
local_parsed.strftime("%#m/%#d/%Y %#I:%M:%S %p") if hasattr(local_parsed, "strftime") else "",
}
for text in date_texts:
if text:
needles.append((text + ":utf16le", text.encode("utf-16le")))
else:
needles.append((value + ":utf16le", value.encode("utf-16le")))
needles.append((value + ":utf8", value.encode("utf-8")))
return [(label, needle) for label, needle in needles if needle]
def parse_datetime(value: str) -> dt.datetime:
try:
return dt.datetime.fromisoformat(value)
except ValueError:
pass
for fmt in ("%m/%d/%Y %H:%M:%S", "%m/%d/%Y %I:%M:%S %p"):
try:
return dt.datetime.strptime(value, fmt)
except ValueError:
continue
raise ValueError(f"Unsupported datetime value {value!r}")
def value_hits(data: bytes, needles: list[tuple[str, bytes]]) -> str:
hits: list[str] = []
for label, needle in needles:
start = 0
while True:
offset = data.find(needle, start)
if offset < 0:
break
hits.append(f"{label}@{offset}")
start = offset + 1
return " ".join(hits)
def write_event_rows(events: list[dict], needles: list[tuple[str, bytes]], out: Path) -> None:
out.parent.mkdir(parents=True, exist_ok=True)
header = [
"time",
"event",
"module",
"name",
"ecx",
"retval",
"args",
"candidate_index",
"candidate_size",
"candidate_ptr",
"value_hits",
"hex",
]
with out.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n")
writer.writeheader()
for event in events:
candidates = event.get("candidates") or [None]
for index, candidate in enumerate(candidates):
data = b""
if candidate:
data = bytes_from_hex(candidate.get("hex", ""))
writer.writerow({
"time": event.get("time", ""),
"event": event.get("event", ""),
"module": event.get("module", ""),
"name": event.get("name", ""),
"ecx": event.get("ecx", ""),
"retval": event.get("retval", ""),
"args": json.dumps(event.get("args", []), separators=(",", ":")),
"candidate_index": "" if candidate is None else str(index),
"candidate_size": "" if candidate is None else str(candidate.get("size", "")),
"candidate_ptr": "" if candidate is None else candidate.get("ptr", ""),
"value_hits": value_hits(data, needles),
"hex": "" if candidate is None else candidate.get("hex", ""),
})
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("capture_dir", type=Path)
parser.add_argument("--out", type=Path)
parser.add_argument("--frida-log", type=Path)
args = parser.parse_args()
capture_dir = args.capture_dir
write_type, values = harness_write_values(capture_dir / "harness.log")
needles = value_needles(write_type, values)
frida_log = args.frida_log
if frida_log is None:
candidates = [
capture_dir / "frida.stdout.jsonl",
capture_dir / "client-frida.stdout.jsonl",
capture_dir / "service-frida.stdout.jsonl",
]
frida_log = next((path for path in candidates if path.exists()), candidates[0])
events = list(iter_json_events(frida_log))
out = args.out or (capture_dir / "frida-events.tsv")
write_event_rows(events, needles, out)
print(f"wrote {out}")
print(f"frida_log={frida_log}")
print(f"events={len(events)} write_type={write_type} write_values={','.join(values)} needles={len(needles)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+165
View File
@@ -0,0 +1,165 @@
from __future__ import annotations
import argparse
import binascii
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from scapy.all import IPv6, Raw, TCP, rdpcap
SERVICE_PORT = 49704
@dataclass(frozen=True)
class Endpoint:
host: str
port: int
@dataclass
class PayloadPacket:
frame: int
time: float
src: Endpoint
dst: Endpoint
seq: int
ack: int
payload: bytes
def conversation_key(src: Endpoint, dst: Endpoint) -> tuple[Endpoint, Endpoint]:
return tuple(sorted((src, dst), key=lambda e: (e.host, e.port))) # type: ignore[return-value]
def sanitize_endpoint(endpoint: Endpoint) -> str:
return f"{endpoint.host.replace(':', '_')}_{endpoint.port}"
def extract_packets(path: Path) -> dict[tuple[Endpoint, Endpoint], list[PayloadPacket]]:
conversations: dict[tuple[Endpoint, Endpoint], list[PayloadPacket]] = defaultdict(list)
for index, packet in enumerate(rdpcap(str(path)), start=1):
if IPv6 not in packet or TCP not in packet:
continue
tcp = packet[TCP]
if int(tcp.sport) != SERVICE_PORT and int(tcp.dport) != SERVICE_PORT:
continue
if Raw not in packet:
continue
src = Endpoint(str(packet[IPv6].src), int(tcp.sport))
dst = Endpoint(str(packet[IPv6].dst), int(tcp.dport))
payload = bytes(packet[Raw].load)
if not payload:
continue
conversations[conversation_key(src, dst)].append(PayloadPacket(
frame=index,
time=float(packet.time),
src=src,
dst=dst,
seq=int(tcp.seq),
ack=int(tcp.ack),
payload=payload,
))
return conversations
def choose_conversation(conversations: dict[tuple[Endpoint, Endpoint], list[PayloadPacket]]) -> tuple[Endpoint, Endpoint]:
if not conversations:
raise RuntimeError("No IPv6 TCP payload conversations involving port 49704 were found.")
def score(item: tuple[tuple[Endpoint, Endpoint], list[PayloadPacket]]) -> tuple[int, int]:
_, packets = item
return sum(len(p.payload) for p in packets), len(packets)
return max(conversations.items(), key=score)[0]
def unique_direction_payloads(packets: list[PayloadPacket], src: Endpoint, dst: Endpoint) -> bytes:
seen: set[tuple[int, int]] = set()
chunks: list[tuple[int, float, bytes]] = []
for packet in packets:
if packet.src != src or packet.dst != dst:
continue
key = (packet.seq, len(packet.payload))
if key in seen:
continue
seen.add(key)
chunks.append((packet.seq, packet.time, packet.payload))
chunks.sort(key=lambda item: (item[0], item[1]))
return b"".join(chunk for _, _, chunk in chunks)
def ascii_preview(payload: bytes, limit: int = 48) -> str:
text = []
for value in payload[:limit]:
if 32 <= value <= 126:
text.append(chr(value))
else:
text.append(".")
return "".join(text)
def write_outputs(path: Path, out_dir: Path) -> None:
conversations = extract_packets(path)
selected = choose_conversation(conversations)
packets = conversations[selected]
packets.sort(key=lambda p: (p.time, p.frame))
a, b = selected
out_dir.mkdir(parents=True, exist_ok=True)
summary_lines = [
"capture\tselected_a\tselected_b\tpayload_packets\tpayload_bytes",
f"{path}\t{a.host}:{a.port}\t{b.host}:{b.port}\t{len(packets)}\t{sum(len(p.payload) for p in packets)}",
"",
"conversation_a\tconversation_b\tpayload_packets\tpayload_bytes",
]
for (left, right), conv_packets in sorted(
conversations.items(),
key=lambda item: sum(len(p.payload) for p in item[1]),
reverse=True,
):
summary_lines.append(
f"{left.host}:{left.port}\t{right.host}:{right.port}\t"
f"{len(conv_packets)}\t{sum(len(p.payload) for p in conv_packets)}"
)
(out_dir / "nmx-conversations.tsv").write_text("\n".join(summary_lines) + "\n", encoding="utf-8")
packet_lines = [
"frame\ttime_relative\tfrom_service\tsrc\tsport\tdst\tdport\tseq\tack\tpayload_len\thex_prefix\tascii_preview"
]
first_time = packets[0].time if packets else 0.0
for packet in packets:
from_service = packet.src.port == SERVICE_PORT
packet_lines.append(
f"{packet.frame}\t{packet.time - first_time:.9f}\t{int(from_service)}\t"
f"{packet.src.host}\t{packet.src.port}\t{packet.dst.host}\t{packet.dst.port}\t"
f"{packet.seq}\t{packet.ack}\t{len(packet.payload)}\t"
f"{binascii.hexlify(packet.payload[:32]).decode('ascii')}\t{ascii_preview(packet.payload)}"
)
(out_dir / "nmx-payload-packets.tsv").write_text("\n".join(packet_lines) + "\n", encoding="utf-8")
for src, dst in ((a, b), (b, a)):
data = unique_direction_payloads(packets, src, dst)
name = f"nmx-stream-{sanitize_endpoint(src)}-to-{sanitize_endpoint(dst)}.bin"
(out_dir / name).write_bytes(data)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("pcapng", type=Path)
parser.add_argument("--out-dir", type=Path)
args = parser.parse_args()
out_dir = args.out_dir or args.pcapng.parent
write_outputs(args.pcapng, out_dir)
return 0
if __name__ == "__main__":
raise SystemExit(main())
+107
View File
@@ -0,0 +1,107 @@
from __future__ import annotations
import argparse
import csv
import uuid
from pathlib import Path
import pefile
IMAGE_BASE = 0x10000000
PROXY_VTBL_LIST = 0x10007CC8
STUB_VTBL_LIST = 0x10007CE4
NAME_LIST = 0x10007D00
INTERFACE_COUNT = 6
def u16(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset:offset + 2], "little")
def u32(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset:offset + 4], "little")
class PeView:
def __init__(self, path: Path):
self.pe = pefile.PE(str(path))
self.data = bytes(self.pe.__data__)
self.base = self.pe.OPTIONAL_HEADER.ImageBase
def offset(self, va: int) -> int:
return self.pe.get_offset_from_rva(va - self.base)
def u16(self, va: int) -> int:
return u16(self.data, self.offset(va))
def u32(self, va: int) -> int:
return u32(self.data, self.offset(va))
def guid(self, va: int) -> str:
return str(uuid.UUID(bytes_le=self.data[self.offset(va):self.offset(va) + 16])).upper()
def asciiz(self, va: int) -> str:
start = self.offset(va)
end = self.data.index(b"\x00", start)
return self.data[start:end].decode("ascii")
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"--dll",
type=Path,
default=Path(r"C:\Program Files (x86)\ArchestrA\Framework\Bin\NmxSvcps.dll"),
)
parser.add_argument("--out", type=Path, default=Path("analysis/proxy/nmxsvcps-proxy-layout.tsv"))
args = parser.parse_args()
pe = PeView(args.dll)
args.out.parent.mkdir(parents=True, exist_ok=True)
with args.out.open("w", encoding="utf-8", newline="") as handle:
writer = csv.writer(handle, delimiter="\t", lineterminator="\n")
writer.writerow([
"interface_index",
"name",
"iid",
"iid_va",
"method_count",
"user_method_count",
"proxy_vtbl_va",
"stub_vtbl_va",
"proxy_info_va",
"proxy_info_words",
])
for index in range(INTERFACE_COUNT):
name_va = pe.u32(NAME_LIST + index * 4)
name = pe.asciiz(name_va)
proxy_vtbl = pe.u32(PROXY_VTBL_LIST + index * 4)
stub_vtbl = pe.u32(STUB_VTBL_LIST + index * 4)
proxy_info = pe.u32(proxy_vtbl)
iid_va = pe.u32(proxy_vtbl + 4)
method_count = pe.u32(stub_vtbl + 8)
user_method_count = max(0, method_count - 3)
proxy_info_words = [pe.u32(proxy_info + i * 4) for i in range(8)]
writer.writerow([
index,
name,
pe.guid(iid_va),
f"0x{iid_va:08x}",
method_count,
user_method_count,
f"0x{proxy_vtbl:08x}",
f"0x{stub_vtbl:08x}",
f"0x{proxy_info:08x}",
",".join(f"0x{word:08x}" for word in proxy_info_words),
])
print(f"wrote {args.out}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,314 @@
from __future__ import annotations
import argparse
import csv
import re
import uuid
from dataclasses import dataclass
from pathlib import Path
import pefile
PROXY_VTBL_LIST = 0x10007CC8
STUB_VTBL_LIST = 0x10007CE4
NAME_LIST = 0x10007D00
INTERFACE_COUNT = 6
KNOWN_METHODS = {
"INmxService2": [
"RegisterEngine",
"UnRegisterEngine",
"Connect",
"TransferData",
"AddSubscriberEngine",
"RemoveSubscriberEngine",
"SetHeartbeatSendInterval",
"RegisterEngine2",
"GetPartnerVersion",
],
"INmxSvcStatistics": [
"GetNmxSvcStatistics",
"ResetSvcStatistics",
],
"INmxStatus": [
"OPENCONNECTION",
"CloseConnection",
"GetConnectionStatus",
],
"INmxService": [
"RegisterEngine",
"UnRegisterEngine",
"Connect",
"TransferData",
"AddSubscriberEngine",
"RemoveSubscriberEngine",
"SetHeartbeatSendInterval",
],
"INmxNotify": [
"ConnectionEstablished",
"ConnectionClosed",
],
"INmxSvcCallback": [
"DataReceived",
"StatusReceived",
],
}
FC_TYPES = {
0x02: "FC_CHAR",
0x03: "FC_SMALL",
0x04: "FC_USMALL",
0x05: "FC_WCHAR",
0x06: "FC_SHORT",
0x07: "FC_USHORT",
0x08: "FC_LONG",
0x09: "FC_ULONG",
0x0A: "FC_FLOAT",
0x0B: "FC_HYPER",
0x0C: "FC_DOUBLE",
0x0D: "FC_ENUM16",
0x0E: "FC_ENUM32",
0x10: "FC_ERROR_STATUS_T",
0x11: "FC_RP",
0x12: "FC_UP",
0x14: "FC_FP",
0x15: "FC_STRUCT",
0x1B: "FC_CARRAY",
0x2F: "FC_IP",
0x36: "FC_BYTE_COUNT_POINTER",
0x46: "FC_NO_REPEAT",
0x4C: "FC_EMBEDDED_COMPLEX",
0x5A: "FC_CONSTANT_IID",
0x5B: "FC_END",
0x5C: "FC_PAD",
}
PARAM_FLAGS = [
(0x0001, "must_size"),
(0x0002, "must_free"),
(0x0008, "in"),
(0x0010, "out"),
(0x0020, "return"),
(0x0040, "base_type"),
(0x0080, "by_value"),
(0x0100, "simple_ref"),
]
def u16(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset:offset + 2], "little")
def u32(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset:offset + 4], "little")
class PeView:
def __init__(self, path: Path):
self.pe = pefile.PE(str(path))
self.data = bytes(self.pe.__data__)
self.base = self.pe.OPTIONAL_HEADER.ImageBase
def offset(self, va: int) -> int:
return self.pe.get_offset_from_rva(va - self.base)
def slice(self, va: int, length: int) -> bytes:
offset = self.offset(va)
return self.data[offset:offset + length]
def u16(self, va: int) -> int:
return u16(self.data, self.offset(va))
def u32(self, va: int) -> int:
return u32(self.data, self.offset(va))
def guid(self, va: int) -> str:
return str(uuid.UUID(bytes_le=self.slice(va, 16))).upper()
def asciiz(self, va: int) -> str:
start = self.offset(va)
end = self.data.index(b"\x00", start)
return self.data[start:end].decode("ascii")
@dataclass(frozen=True)
class InterfaceInfo:
index: int
name: str
iid: str
method_count: int
user_method_count: int
proxy_info_va: int
stub_desc_va: int
proc_format_va: int
offset_table_va: int
type_format_va: int
def parse_interface(pe: PeView, index: int) -> InterfaceInfo:
name_va = pe.u32(NAME_LIST + index * 4)
name = pe.asciiz(name_va)
proxy_vtbl = pe.u32(PROXY_VTBL_LIST + index * 4)
stub_vtbl = pe.u32(STUB_VTBL_LIST + index * 4)
proxy_info = pe.u32(proxy_vtbl)
iid_va = pe.u32(proxy_vtbl + 4)
method_count = pe.u32(stub_vtbl + 8)
stub_desc = pe.u32(proxy_info)
proc_format = pe.u32(proxy_info + 4)
offset_table = pe.u32(proxy_info + 8)
type_format = pe.u32(stub_desc + 0x20)
return InterfaceInfo(
index=index,
name=name,
iid=pe.guid(iid_va),
method_count=method_count,
user_method_count=max(0, method_count - 3),
proxy_info_va=proxy_info,
stub_desc_va=stub_desc,
proc_format_va=proc_format,
offset_table_va=offset_table,
type_format_va=type_format,
)
def hex_bytes(data: bytes) -> str:
return data.hex(" ")
def safe_name(value: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]+", "_", value)
def flags_text(value: int) -> str:
names = [name for bit, name in PARAM_FLAGS if value & bit]
remainder = value & ~sum(bit for bit, _ in PARAM_FLAGS)
if remainder:
names.append(f"0x{remainder:04x}")
return "|".join(names) if names else "none"
def type_text(type_or_offset: int, type_format_va: int, pe: PeView, is_base_type: bool) -> str:
if is_base_type:
return FC_TYPES.get(type_or_offset, f"FC_0x{type_or_offset:02x}")
type_va = type_format_va + type_or_offset
raw = pe.slice(type_va, 18)
annotated = []
for byte in raw:
annotated.append(FC_TYPES.get(byte, f"0x{byte:02x}"))
return f"type+0x{type_or_offset:04x} @{type_va:#010x} [{', '.join(annotated)}]"
def parse_proc(pe: PeView, interface: InterfaceInfo, method_index: int, method_name: str, proc_offset: int) -> dict[str, object]:
va = interface.proc_format_va + proc_offset
header = pe.slice(va, 24)
param_count = header[15]
params = []
pos = 24
for index in range(param_count):
desc = pe.slice(va + pos, 6)
flags = u16(desc, 0)
stack_offset = u16(desc, 2)
type_or_offset = u16(desc, 4)
is_base_type = bool(flags & 0x0040)
params.append(
f"p{index}:flags=0x{flags:04x}({flags_text(flags)}),"
f"stack={stack_offset},"
f"type={type_text(type_or_offset, interface.type_format_va, pe, is_base_type)}"
)
pos += 6
return {
"interface_index": interface.index,
"interface_name": interface.name,
"iid": interface.iid,
"method_index": method_index,
"method_name": method_name,
"proc_offset": f"0x{proc_offset:04x}",
"proc_va": f"0x{va:08x}",
"opnum": u16(header, 6),
"x86_stack_size": u16(header, 8),
"client_buffer_size": u16(header, 10),
"server_buffer_size": u16(header, 12),
"proc_flags": f"0x{header[14]:02x}",
"param_count": param_count,
"oi2_flags": f"0x{header[16]:02x}",
"oi2_ext_flags": f"0x{header[17]:02x}",
"raw_header": hex_bytes(header),
"params": "; ".join(params),
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"--dll",
type=Path,
default=Path(r"C:\Program Files (x86)\ArchestrA\Framework\Bin\NmxSvcps.dll"),
)
parser.add_argument("--out", type=Path, default=Path("analysis/proxy/nmxsvcps-procedures.tsv"))
parser.add_argument("--type-dir", type=Path, default=Path("analysis/proxy/type-format-snippets"))
args = parser.parse_args()
pe = PeView(args.dll)
interfaces = [parse_interface(pe, i) for i in range(INTERFACE_COUNT)]
args.out.parent.mkdir(parents=True, exist_ok=True)
args.type_dir.mkdir(parents=True, exist_ok=True)
fieldnames = [
"interface_index",
"interface_name",
"iid",
"method_index",
"method_name",
"proc_offset",
"proc_va",
"opnum",
"x86_stack_size",
"client_buffer_size",
"server_buffer_size",
"proc_flags",
"param_count",
"oi2_flags",
"oi2_ext_flags",
"raw_header",
"params",
]
with args.out.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, delimiter="\t", fieldnames=fieldnames, lineterminator="\n")
writer.writeheader()
for interface in interfaces:
names = KNOWN_METHODS[interface.name]
offsets = [pe.u16(interface.offset_table_va + i * 2) for i in range(interface.method_count)]
user_offsets = offsets[3:3 + interface.user_method_count]
if len(names) != len(user_offsets):
raise ValueError(f"{interface.name} method names do not match recovered offset count")
for method_index, (method_name, proc_offset) in enumerate(zip(names, user_offsets), start=3):
writer.writerow(parse_proc(pe, interface, method_index, method_name, proc_offset))
type_snippet = pe.slice(interface.type_format_va, 0x98)
(args.type_dir / f"{interface.index}-{safe_name(interface.name)}.txt").write_text(
"\n".join([
f"{interface.name}",
f"iid={interface.iid}",
f"stub_desc=0x{interface.stub_desc_va:08x}",
f"proc_format=0x{interface.proc_format_va:08x}",
f"offset_table=0x{interface.offset_table_va:08x}",
f"type_format=0x{interface.type_format_va:08x}",
f"type_bytes={hex_bytes(type_snippet)}",
"",
]),
encoding="utf-8",
)
print(f"wrote {args.out}")
print(f"wrote type snippets under {args.type_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,150 @@
from __future__ import annotations
import argparse
import binascii
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from scapy.all import IP, IPv6, Raw, TCP, rdpcap
@dataclass(frozen=True)
class Endpoint:
host: str
port: int
@dataclass
class PayloadPacket:
frame: int
time: float
src: Endpoint
dst: Endpoint
seq: int
payload: bytes
def conversation_key(src: Endpoint, dst: Endpoint) -> tuple[Endpoint, Endpoint]:
return tuple(sorted((src, dst), key=lambda e: (e.host, e.port))) # type: ignore[return-value]
def sanitize_endpoint(endpoint: Endpoint) -> str:
return f"{endpoint.host.replace(':', '_').replace('.', '_')}_{endpoint.port}"
def packet_hosts(packet) -> tuple[str, str] | None:
if IP in packet:
return str(packet[IP].src), str(packet[IP].dst)
if IPv6 in packet:
return str(packet[IPv6].src), str(packet[IPv6].dst)
return None
def extract_packets(path: Path) -> dict[tuple[Endpoint, Endpoint], list[PayloadPacket]]:
conversations: dict[tuple[Endpoint, Endpoint], list[PayloadPacket]] = defaultdict(list)
for index, packet in enumerate(rdpcap(str(path)), start=1):
if TCP not in packet or Raw not in packet:
continue
hosts = packet_hosts(packet)
if hosts is None:
continue
src_host, dst_host = hosts
tcp = packet[TCP]
src = Endpoint(src_host, int(tcp.sport))
dst = Endpoint(dst_host, int(tcp.dport))
payload = bytes(packet[Raw].load)
if not payload:
continue
conversations[conversation_key(src, dst)].append(PayloadPacket(
frame=index,
time=float(packet.time),
src=src,
dst=dst,
seq=int(tcp.seq),
payload=payload,
))
return conversations
def ascii_preview(payload: bytes, limit: int = 48) -> str:
chars = []
for value in payload[:limit]:
chars.append(chr(value) if 32 <= value <= 126 else ".")
return "".join(chars)
def unique_direction_payloads(packets: list[PayloadPacket], src: Endpoint, dst: Endpoint) -> bytes:
seen: set[tuple[int, int]] = set()
chunks: list[tuple[int, float, bytes]] = []
for packet in packets:
if packet.src != src or packet.dst != dst:
continue
key = (packet.seq, len(packet.payload))
if key in seen:
continue
seen.add(key)
chunks.append((packet.seq, packet.time, packet.payload))
chunks.sort(key=lambda item: (item[0], item[1]))
return b"".join(chunk for _, _, chunk in chunks)
def write_outputs(path: Path, out_dir: Path, top: int) -> None:
conversations = extract_packets(path)
out_dir.mkdir(parents=True, exist_ok=True)
ranked = sorted(
conversations.items(),
key=lambda item: sum(len(packet.payload) for packet in item[1]),
reverse=True,
)
lines = ["conversation_a\tconversation_b\tpayload_packets\tpayload_bytes\tfirst_relative\tlast_relative"]
first_capture_time = min((packet.time for packets in conversations.values() for packet in packets), default=0.0)
for (left, right), packets in ranked:
times = [packet.time for packet in packets]
lines.append(
f"{left.host}:{left.port}\t{right.host}:{right.port}\t"
f"{len(packets)}\t{sum(len(packet.payload) for packet in packets)}\t"
f"{min(times) - first_capture_time:.9f}\t{max(times) - first_capture_time:.9f}"
)
(out_dir / "tcp-conversations.tsv").write_text("\n".join(lines) + "\n", encoding="utf-8")
packet_lines = ["frame\ttime_relative\tsrc\tdst\tseq\tpayload_len\thex_prefix\tascii_preview"]
for _, packets in ranked[:top]:
first_time = min(packet.time for packet in packets)
for packet in sorted(packets, key=lambda item: (item.time, item.frame)):
packet_lines.append(
f"{packet.frame}\t{packet.time - first_time:.9f}\t"
f"{packet.src.host}:{packet.src.port}\t{packet.dst.host}:{packet.dst.port}\t"
f"{packet.seq}\t{len(packet.payload)}\t"
f"{binascii.hexlify(packet.payload[:32]).decode('ascii')}\t{ascii_preview(packet.payload)}"
)
(out_dir / "tcp-payload-packets.tsv").write_text("\n".join(packet_lines) + "\n", encoding="utf-8")
for (left, right), packets in ranked[:top]:
for src, dst in ((left, right), (right, left)):
data = unique_direction_payloads(packets, src, dst)
if not data:
continue
name = f"tcp-stream-{sanitize_endpoint(src)}-to-{sanitize_endpoint(dst)}.bin"
(out_dir / name).write_bytes(data)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("pcapng", type=Path)
parser.add_argument("--out-dir", type=Path)
parser.add_argument("--top", type=int, default=8)
args = parser.parse_args()
write_outputs(args.pcapng, args.out_dir or args.pcapng.parent, args.top)
return 0
if __name__ == "__main__":
raise SystemExit(main())
+211
View File
@@ -0,0 +1,211 @@
from __future__ import annotations
import argparse
import datetime as dt
import re
import uuid
from pathlib import Path
import pefile
ASCII_RE = re.compile(rb"[\x20-\x7e]{5,}")
UTF16_RE = re.compile(rb"(?:[\x20-\x7e]\x00){5,}")
def read_c_string(data: bytes, offset: int, limit: int = 128) -> str:
chunk = data[offset:offset + limit]
end = chunk.find(b"\x00")
if end >= 0:
chunk = chunk[:end]
return chunk.decode("ascii", errors="replace")
def timestamp_text(value: int) -> str:
try:
return dt.datetime.fromtimestamp(value, tz=dt.timezone.utc).isoformat()
except Exception:
return str(value)
def hex_context(data: bytes, offset: int, size: int) -> str:
start = max(0, offset - size)
end = min(len(data), offset + size)
return data[start:end].hex(" ")
def collect_strings(data: bytes, needles: list[str]) -> list[str]:
lowered = [needle.lower() for needle in needles]
matches: list[str] = []
for raw in ASCII_RE.findall(data):
text = raw.decode("ascii", errors="replace")
if any(needle in text.lower() for needle in lowered):
matches.append(text)
for raw in UTF16_RE.findall(data):
text = raw.decode("utf-16le", errors="replace")
if any(needle in text.lower() for needle in lowered):
matches.append(text)
return sorted(set(matches))
def format_exports(pe: pefile.PE) -> list[str]:
if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
return ["(none)"]
rows = ["| Ordinal | RVA | Name |", "| ---: | ---: | --- |"]
for symbol in pe.DIRECTORY_ENTRY_EXPORT.symbols:
name = symbol.name.decode("utf-8", errors="replace") if symbol.name else ""
rows.append(f"| {symbol.ordinal} | 0x{symbol.address:08x} | `{name}` |")
return rows
def format_imports(pe: pefile.PE) -> list[str]:
if not hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
return ["(none)"]
lines: list[str] = []
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dll = entry.dll.decode("utf-8", errors="replace")
names: list[str] = []
for imp in entry.imports:
if imp.name:
names.append(imp.name.decode("utf-8", errors="replace"))
else:
names.append(f"ord_{imp.ordinal}")
lines.append(f"- `{dll}`: " + ", ".join(f"`{name}`" for name in names))
return lines
def format_resources(pe: pefile.PE) -> list[str]:
if not hasattr(pe, "DIRECTORY_ENTRY_RESOURCE"):
return ["(none)"]
rows = ["| Type | ID/name | Lang | RVA | Size |", "| --- | --- | ---: | ---: | ---: |"]
for type_entry in pe.DIRECTORY_ENTRY_RESOURCE.entries:
type_name = str(type_entry.name) if type_entry.name is not None else str(type_entry.struct.Id)
if not hasattr(type_entry, "directory"):
continue
for name_entry in type_entry.directory.entries:
name = str(name_entry.name) if name_entry.name is not None else str(name_entry.struct.Id)
if not hasattr(name_entry, "directory"):
continue
for lang_entry in name_entry.directory.entries:
data = lang_entry.data.struct
rows.append(
f"| `{type_name}` | `{name}` | {lang_entry.struct.Id} | "
f"0x{data.OffsetToData:08x} | {data.Size} |"
)
return rows
def guid_hits(data: bytes, guid_text: str) -> list[str]:
value = uuid.UUID(guid_text)
patterns = [
("text-lower", str(value).encode("ascii")),
("text-upper", str(value).upper().encode("ascii")),
("utf16-lower", str(value).encode("utf-16le")),
("utf16-upper", str(value).upper().encode("utf-16le")),
("guid-bytes-le", value.bytes_le),
("guid-bytes-be", value.bytes),
]
lines: list[str] = []
for label, pattern in patterns:
start = 0
while True:
offset = data.find(pattern, start)
if offset < 0:
break
lines.append(
f"- `{guid_text}` `{label}` at file offset `0x{offset:08x}`: "
f"`{hex_context(data, offset, 32)}`"
)
start = offset + 1
return lines
def report_one(path: Path, guids: list[str]) -> str:
data = path.read_bytes()
pe = pefile.PE(str(path), fast_load=False)
pe.parse_data_directories()
machine = pe.FILE_HEADER.Machine
bitness = "x64" if machine == 0x8664 else "x86" if machine == 0x14c else f"machine 0x{machine:04x}"
lines = [
f"# {path.name}",
"",
f"- Path: `{path}`",
f"- Size: {len(data)} bytes",
f"- Machine: {bitness}",
f"- PE timestamp: {timestamp_text(pe.FILE_HEADER.TimeDateStamp)}",
f"- ImageBase: `0x{pe.OPTIONAL_HEADER.ImageBase:08x}`",
"",
"## Exports",
"",
*format_exports(pe),
"",
"## Imports",
"",
*format_imports(pe),
"",
"## Resources",
"",
*format_resources(pe),
"",
"## GUID hits",
"",
]
hit_lines: list[str] = []
for guid in guids:
hit_lines.extend(guid_hits(data, guid))
lines.extend(hit_lines or ["(none)"])
needles = [
"ndr",
"proxy",
"stub",
"rpc",
"interface",
"nmx",
"lmx",
"putrequest",
"getresponse",
"registeritems",
"write",
*guids,
]
strings = collect_strings(data, needles)
lines.extend(["", "## Interesting strings", ""])
lines.extend(f"- `{text}`" for text in strings[:200])
if len(strings) > 200:
lines.append(f"- ... {len(strings) - 200} more")
return "\n".join(lines) + "\n"
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("pe", type=Path, nargs="+")
parser.add_argument("--guid", action="append", default=[])
parser.add_argument("--out-dir", type=Path)
args = parser.parse_args()
if args.out_dir:
args.out_dir.mkdir(parents=True, exist_ok=True)
for path in args.pe:
report = report_one(path, args.guid)
if args.out_dir:
(args.out_dir / f"{path.name}.md").write_text(report, encoding="utf-8")
else:
print(report)
return 0
if __name__ == "__main__":
raise SystemExit(main())
+123
View File
@@ -0,0 +1,123 @@
from __future__ import annotations
import argparse
import json
import re
import struct
from pathlib import Path
ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
def load_frida_events(path: Path) -> list[dict]:
events: list[dict] = []
for raw in path.read_text(errors="replace").splitlines():
line = ANSI_RE.sub("", raw).strip()
index = line.find("{")
if index < 0:
continue
try:
outer = json.loads(line[index:])
except json.JSONDecodeError:
continue
payload = outer.get("payload") if outer.get("type") == "send" else outer
if isinstance(payload, str):
try:
payload = json.loads(payload)
except json.JSONDecodeError:
continue
if isinstance(payload, dict) and payload.get("event"):
events.append(payload)
return events
def first_candidate(event: dict, min_size: int = 20, max_size: int = 4096) -> bytes | None:
for candidate in event.get("candidates") or []:
hex_text = candidate.get("hex") or ""
size = int(candidate.get("size") or 0)
if hex_text and min_size <= size <= max_size:
return bytes.fromhex(hex_text)
return None
def extract_needles(events: list[dict], scalar_int: int | None) -> list[tuple[str, bytes]]:
start = None
for index, event in enumerate(events):
if event.get("name", "").startswith("CLMXProxyServer.Write") and event.get("args"):
start = index
break
needles: list[tuple[str, bytes]] = []
if scalar_int is not None:
needles.append((f"int32:{scalar_int}", struct.pack("<i", scalar_int)))
if start is None:
return needles
for event in events[start + 1:]:
name = event.get("name")
if name not in ("CNmxAdapter.PutRequest", "CNmxAdapter.TransferData", "CNmxAdapter.ProcessDataReceived"):
continue
body = first_candidate(event)
if body is None:
continue
if name == "CNmxAdapter.PutRequest" and body.startswith(b"\x37\x01"):
needles.append(("putrequest-body", body))
elif name == "CNmxAdapter.TransferData" and (scalar_int is None or struct.pack("<i", scalar_int) in body):
needles.append(("transferdata-body", body))
elif name == "CNmxAdapter.ProcessDataReceived" and (scalar_int is None or struct.pack("<i", scalar_int) in body):
needles.append(("callback-body", body))
found_names = {name for name, _ in needles}
if {"putrequest-body", "transferdata-body", "callback-body"}.issubset(found_names):
break
return needles
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("capture_dir", type=Path)
parser.add_argument("--scalar-int", type=int)
parser.add_argument("--frida-log", type=Path)
args = parser.parse_args()
frida_log = args.frida_log
if frida_log is None:
candidates = [
args.capture_dir / "frida.stdout.jsonl",
args.capture_dir / "client-frida.stdout.jsonl",
]
frida_log = next((path for path in candidates if path.exists()), candidates[0])
events = load_frida_events(frida_log)
needles = extract_needles(events, args.scalar_int)
stream_paths = sorted(args.capture_dir.glob("tcp-stream-*.bin"))
lines = ["needle\tneedle_len\tstream\toffset"]
for name, needle in needles:
hit_count = 0
for stream_path in stream_paths:
data = stream_path.read_bytes()
offset = data.find(needle)
if offset >= 0:
lines.append(f"{name}\t{len(needle)}\t{stream_path.name}\t{offset}")
hit_count += 1
if hit_count == 0:
lines.append(f"{name}\t{len(needle)}\t\t-1")
output = args.capture_dir / "frida-to-tcp-map.tsv"
output.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f"wrote {output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+241
View File
@@ -0,0 +1,241 @@
from __future__ import annotations
import argparse
import csv
from dataclasses import dataclass
from pathlib import Path
import uuid
PTYPE_NAMES = {
0: "request",
2: "response",
3: "fault",
11: "bind",
12: "bind_ack",
14: "alter_context",
15: "alter_context_resp",
}
@dataclass
class DceRpcPdu:
stream: str
offset: int
ptype: int
flags: int
frag_len: int
auth_len: int
call_id: int
alloc_hint: int | None
context_id: int | None
opnum: int | None
stub_offset: int | None
stub_len: int
stub: bytes
context_interface_uuid: str = ""
context_interface_version: str = ""
bind_contexts: str = ""
def u16le(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset:offset + 2], "little")
def u32le(data: bytes, offset: int) -> int:
return int.from_bytes(data[offset:offset + 4], "little")
def guid_le(data: bytes, offset: int) -> str:
return str(uuid.UUID(bytes_le=data[offset:offset + 16])).upper()
def looks_like_header(data: bytes, offset: int) -> bool:
if offset + 16 > len(data):
return False
if data[offset] != 5 or data[offset + 1] != 0:
return False
ptype = data[offset + 2]
if ptype not in PTYPE_NAMES:
return False
if data[offset + 4:offset + 8] != b"\x10\x00\x00\x00":
return False
frag_len = u16le(data, offset + 8)
auth_len = u16le(data, offset + 10)
if frag_len < 16 or offset + frag_len > len(data):
return False
if auth_len > frag_len:
return False
return True
def parse_pdu(stream: str, data: bytes, offset: int) -> DceRpcPdu:
ptype = data[offset + 2]
flags = data[offset + 3]
frag_len = u16le(data, offset + 8)
auth_len = u16le(data, offset + 10)
call_id = u32le(data, offset + 12)
alloc_hint = None
context_id = None
opnum = None
stub_offset = None
stub_len = 0
stub = b""
if ptype == 0 and frag_len >= 24:
alloc_hint = u32le(data, offset + 16)
context_id = u16le(data, offset + 20)
opnum = u16le(data, offset + 22)
stub_offset = offset + 24
elif ptype in (2, 3) and frag_len >= 24:
alloc_hint = u32le(data, offset + 16)
context_id = u16le(data, offset + 20)
stub_offset = offset + 24
if stub_offset is not None:
auth_pad_and_trailer = auth_len + (8 if auth_len else 0)
pdu_end = offset + frag_len - auth_pad_and_trailer
if pdu_end >= stub_offset:
stub = data[stub_offset:pdu_end]
stub_len = len(stub)
return DceRpcPdu(
stream=stream,
offset=offset,
ptype=ptype,
flags=flags,
frag_len=frag_len,
auth_len=auth_len,
call_id=call_id,
alloc_hint=alloc_hint,
context_id=context_id,
opnum=opnum,
stub_offset=stub_offset,
stub_len=stub_len,
stub=stub,
)
def parse_bind_contexts(data: bytes, offset: int, frag_len: int) -> dict[int, tuple[str, str]]:
contexts: dict[int, tuple[str, str]] = {}
if frag_len < 28:
return contexts
count = data[offset + 24]
cursor = offset + 28
end = offset + frag_len
for _ in range(count):
if cursor + 24 > end:
break
context_id = u16le(data, cursor)
transfer_count = data[cursor + 2]
cursor += 4
abstract_uuid = guid_le(data, cursor)
version = f"{u16le(data, cursor + 16)}.{u16le(data, cursor + 18)}"
cursor += 20
cursor += transfer_count * 20
contexts[context_id] = (abstract_uuid, version)
return contexts
def parse_stream(stream_path: Path) -> list[DceRpcPdu]:
data = stream_path.read_bytes()
rows: list[DceRpcPdu] = []
contexts: dict[int, tuple[str, str]] = {}
offset = 0
while offset < len(data):
if looks_like_header(data, offset):
pdu = parse_pdu(stream_path.name, data, offset)
if pdu.ptype in (11, 14):
parsed_contexts = parse_bind_contexts(data, offset, pdu.frag_len)
contexts.update(parsed_contexts)
pdu.bind_contexts = ";".join(
f"{context_id}:{iid}:{version}"
for context_id, (iid, version) in sorted(parsed_contexts.items())
)
if pdu.context_id is not None and pdu.context_id in contexts:
iid, version = contexts[pdu.context_id]
pdu.context_interface_uuid = iid
pdu.context_interface_version = version
rows.append(pdu)
offset += pdu.frag_len
continue
offset += 1
return rows
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("capture_dir", type=Path)
parser.add_argument("--scalar-int", type=int)
args = parser.parse_args()
paths = sorted(args.capture_dir.glob("tcp-stream-*49704*.bin"))
rows: list[DceRpcPdu] = []
for path in paths:
rows.extend(parse_stream(path))
scalar = args.scalar_int.to_bytes(4, "little", signed=True) if args.scalar_int is not None else None
out_path = args.capture_dir / "dcerpc-stream-pdus.tsv"
with out_path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.writer(handle, delimiter="\t", lineterminator="\n")
writer.writerow([
"stream",
"offset",
"ptype",
"ptype_name",
"flags",
"frag_len",
"auth_len",
"call_id",
"alloc_hint",
"context_id",
"context_interface_uuid",
"context_interface_version",
"opnum",
"stub_offset",
"stub_len",
"scalar_hit_offsets",
"bind_contexts",
"stub_hex_prefix",
])
for row in rows:
hits: list[str] = []
if scalar and row.stub:
start = 0
while True:
hit = row.stub.find(scalar, start)
if hit < 0:
break
hits.append(str(hit))
start = hit + 1
writer.writerow([
row.stream,
row.offset,
row.ptype,
PTYPE_NAMES.get(row.ptype, str(row.ptype)),
f"0x{row.flags:02x}",
row.frag_len,
row.auth_len,
row.call_id,
"" if row.alloc_hint is None else row.alloc_hint,
"" if row.context_id is None else row.context_id,
row.context_interface_uuid,
row.context_interface_version,
"" if row.opnum is None else row.opnum,
"" if row.stub_offset is None else row.stub_offset,
row.stub_len,
",".join(hits),
row.bind_contexts,
row.stub[:64].hex(" "),
])
print(f"wrote {out_path} pdus={len(rows)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,82 @@
"""Probe NmxSvc.NmxService through raw DCOM without the AVEVA x86 proxy.
Credentials are read from MX_RPC_USER, MX_RPC_PASSWORD, and MX_RPC_DOMAIN.
Do not put passwords in this file or in checked-in probe output.
"""
from __future__ import annotations
import os
import traceback
from impacket.dcerpc.v5 import dcomrt, rpcrt
from impacket.dcerpc.v5.dtypes import ULONG
from impacket.uuid import string_to_bin, uuidtup_to_bin
CLSID_NMX_SERVICE = string_to_bin("AE24BD51-2E80-44CC-905B-E5446C942BEB")
IID_INMXSERVICE2 = string_to_bin("2630A513-A974-4B1A-8025-457A9A7C56B8")
IID_INMXSERVICE2_BIND = uuidtup_to_bin(("2630A513-A974-4B1A-8025-457A9A7C56B8", "0.0"))
class GetPartnerVersion(dcomrt.DCOMCALL):
opnum = 11
structure = (
("lPartnerGalaxyId", ULONG),
("lPartnerPlatformId", ULONG),
("lPartnerEngineId", ULONG),
)
class GetPartnerVersionResponse(dcomrt.DCOMANSWER):
structure = (
("plPartnerVersion", ULONG),
("ErrorCode", dcomrt.error_status_t),
)
def main() -> int:
target = os.getenv("MX_RPC_TARGET", os.environ.get("COMPUTERNAME", "127.0.0.1"))
user = os.getenv("MX_RPC_USER", "")
password = os.getenv("MX_RPC_PASSWORD", "")
domain = os.getenv("MX_RPC_DOMAIN", "")
dcom = None
try:
dcom = dcomrt.DCOMConnection(
target,
username=user,
password=password,
domain=domain,
authLevel=rpcrt.RPC_C_AUTHN_LEVEL_PKT_PRIVACY,
)
iface = dcom.CoCreateInstanceEx(CLSID_NMX_SERVICE, IID_INMXSERVICE2)
print("cocreate_ok")
print("target=" + iface.get_target())
print("oxid=0x%016x" % iface.get_oxid())
print("oid=0x%016x" % iface.get_oid())
print("ipid=" + iface.get_iPid().hex())
print("ipidRemUnknown=" + iface.get_ipidRemUnknown().hex())
print("bindings=" + "|".join(binding["aNetworkAddr"].rstrip("\x00") for binding in iface.get_cinstance().get_string_bindings()))
request = GetPartnerVersion()
request["lPartnerGalaxyId"] = int(os.getenv("MX_PARTNER_GALAXY", "1"), 0)
request["lPartnerPlatformId"] = int(os.getenv("MX_PARTNER_PLATFORM", "1"), 0)
request["lPartnerEngineId"] = int(os.getenv("MX_PARTNER_ENGINE", "0x7ffd"), 0)
response = iface.request(request, IID_INMXSERVICE2_BIND, iface.get_iPid())
print("get_partner_version_ok")
print("partner_version=%d" % response["plPartnerVersion"])
print("error_code=0x%08x" % response["ErrorCode"])
return 0
except Exception as exc: # noqa: BLE001 - probe should print exact failure class.
print("probe_error=%s: %s" % (type(exc).__name__, exc))
traceback.print_exc()
return 1
finally:
if dcom is not None:
dcom.disconnect()
if __name__ == "__main__":
raise SystemExit(main())
+78
View File
@@ -0,0 +1,78 @@
from __future__ import annotations
import argparse
import socket
import struct
import uuid
IOBJECT_EXPORTER = uuid.UUID("99fcfec4-5260-101b-bbcb-00aa0021347a")
NDR20 = uuid.UUID("8a885d04-1ceb-11c9-9fe8-08002b104860")
def uuid_le(value: uuid.UUID) -> bytes:
return value.bytes_le
def pdu_header(ptype: int, flags: int, frag_len: int, call_id: int) -> bytes:
return struct.pack("<BBBBLHHI", 5, 0, ptype, flags, 0x10, frag_len, 0, call_id)
def bind_pdu(call_id: int = 1) -> bytes:
body = struct.pack("<HHIBBBB", 4280, 4280, 0, 1, 0, 0, 0)
body += struct.pack("<HBB", 0, 1, 0)
body += uuid_le(IOBJECT_EXPORTER) + struct.pack("<HH", 0, 0)
body += uuid_le(NDR20) + struct.pack("<HH", 2, 0)
return pdu_header(11, 3, 16 + len(body), call_id) + body
def request_pdu(call_id: int, opnum: int, stub: bytes) -> bytes:
body = struct.pack("<IHH", len(stub), 0, opnum) + stub
return pdu_header(0, 3, 16 + len(body), call_id) + body
def encode_resolve_oxid_stub(oxid: int, protseqs: list[int]) -> bytes:
# IObjectExporter::ResolveOxid, opnum 0:
# OXID pOxid; unsigned short cRequestedProtseqs;
# conformant ushort array arRequestedProtseqs.
body = struct.pack("<QH", oxid, len(protseqs))
body += b"\x00\x00"
body += struct.pack("<I", len(protseqs))
body += b"".join(struct.pack("<H", protseq) for protseq in protseqs)
if len(body) % 4:
body += b"\x00" * (4 - len(body) % 4)
return body
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=135)
parser.add_argument("--oxid", required=True, help="hex OXID, for example 0xEAF0D2B53BAB5BC2")
parser.add_argument("--protseq", type=lambda text: int(text, 0), action="append", default=[7])
args = parser.parse_args()
oxid = int(args.oxid, 0)
stub = encode_resolve_oxid_stub(oxid, args.protseq)
print(f"resolve_oxid_stub={stub.hex(' ')}")
with socket.create_connection((args.host, args.port), timeout=5) as sock:
sock.sendall(bind_pdu())
bind_response = sock.recv(4096)
print(f"bind_response={bind_response.hex(' ')}")
sock.sendall(request_pdu(2, 0, stub))
response = sock.recv(8192)
print(f"resolve_response={response.hex(' ')}")
if len(response) >= 32 and response[2] == 2:
stub_data = response[24:]
print(f"resolve_response_stub={stub_data.hex(' ')}")
if len(stub_data) >= 4:
print(f"likely_error_status=0x{int.from_bytes(stub_data[-4:], 'little'):08x}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,48 @@
param(
[string]$Configuration = 'Release'
)
$ErrorActionPreference = 'Stop'
$Root = Resolve-Path (Join-Path $PSScriptRoot '..\..')
$Project = Join-Path $Root 'src\NmxComHarness\NmxComHarness.csproj'
$Assembly = Join-Path $Root "src\NmxComHarness\bin\$Configuration\net481\NmxComHarness.exe"
$TypeLibDir = Join-Path $Root 'analysis\proxy\typelib'
$TypeLib = Join-Path $TypeLibDir 'NmxComHarness.tlb'
$TlbExp = 'C:\Program Files (x86)\Microsoft SDKs\Windows\v10.0A\bin\NETFX 4.8.1 Tools\x64\TlbExp.exe'
if (-not (Test-Path $TlbExp)) {
throw "TlbExp.exe was not found at $TlbExp"
}
dotnet build $Project -c $Configuration | Out-Host
New-Item -ItemType Directory -Force $TypeLibDir | Out-Null
& $TlbExp $Assembly "/out:$TypeLib" | Tee-Object -FilePath (Join-Path $TypeLibDir 'tlbexp-output.txt')
$code = @'
using System;
using System.Runtime.InteropServices;
public static class TypeLibReg {
[DllImport("oleaut32.dll", CharSet=CharSet.Unicode)]
public static extern int LoadTypeLibEx(string szFile, int regkind, out IntPtr pptlib);
}
'@
Add-Type $code
[IntPtr]$typeLibPointer = [IntPtr]::Zero
$hr = [TypeLibReg]::LoadTypeLibEx($TypeLib, 1, [ref]$typeLibPointer)
"LoadTypeLibEx(Register) hr=0x{0:X8} ptr={1}" -f ($hr -band 0xffffffff), $typeLibPointer |
Tee-Object -FilePath (Join-Path $TypeLibDir 'register-typelib-output.txt')
if ($hr -ne 0) {
throw "LoadTypeLibEx failed with HRESULT 0x{0:X8}" -f ($hr -band 0xffffffff)
}
$InterfaceKey = 'HKLM:\SOFTWARE\Classes\Interface\{B49F92F7-C748-4169-8ECA-A0670B012746}'
New-Item -Path (Join-Path $InterfaceKey 'NumMethods') -Force | Out-Null
Set-ItemProperty -Path (Join-Path $InterfaceKey 'NumMethods') -Name '(default)' -Value '5'
Get-ItemProperty $InterfaceKey
Get-ChildItem $InterfaceKey -Recurse | ForEach-Object {
$_.Name
Get-ItemProperty $_.PSPath
}
@@ -0,0 +1,68 @@
param(
[Parameter(Mandatory = $true)]
[string]$Name,
[Parameter(Mandatory = $true)]
[string[]]$HarnessArgs
)
$ErrorActionPreference = 'Stop'
$Root = Resolve-Path (Join-Path $PSScriptRoot '..\..')
$Dumpcap = 'C:\Program Files\Wireshark\dumpcap.exe'
$Harness = Join-Path $Root 'src\MxTraceHarness\bin\Release\net481\MxTraceHarness.exe'
$FridaScript = Join-Path $Root 'analysis\frida\mx-nmx-trace.js'
$CaptureDir = Join-Path $Root "captures\$Name"
New-Item -ItemType Directory -Force $CaptureDir | Out-Null
$Pcap = Join-Path $CaptureDir 'loopback.pcapng'
$HarnessLog = Join-Path $CaptureDir 'harness.log'
$FridaOut = Join-Path $CaptureDir 'frida.stdout.jsonl'
$FridaErr = Join-Path $CaptureDir 'frida.stderr.txt'
$DumpOut = Join-Path $CaptureDir 'dumpcap.stdout.txt'
$DumpErr = Join-Path $CaptureDir 'dumpcap.stderr.txt'
$CommandFile = Join-Path $CaptureDir 'command.txt'
foreach ($Path in @($Pcap, $HarnessLog, $FridaOut, $FridaErr, $DumpOut, $DumpErr)) {
if (Test-Path $Path) { Remove-Item -LiteralPath $Path -Force }
}
$FullHarnessArgs = @($HarnessArgs + @("--log=$HarnessLog", "--client=MxFridaLoopback-$Name"))
$Frida = (Get-Command frida.exe -ErrorAction Stop).Source
$FridaArguments = @('-f', $Harness, '-l', $FridaScript, '--') + $FullHarnessArgs
"dumpcap=$Dumpcap" | Out-File -Encoding UTF8 $CommandFile
"frida=$Frida" | Out-File -Encoding UTF8 -Append $CommandFile
"harness=$Harness" | Out-File -Encoding UTF8 -Append $CommandFile
("args=" + ($FridaArguments -join ' ')) | Out-File -Encoding UTF8 -Append $CommandFile
$Dump = Start-Process -FilePath $Dumpcap `
-ArgumentList @('-i', '\Device\NPF_Loopback', '-w', $Pcap, '-q') `
-PassThru `
-WindowStyle Hidden `
-RedirectStandardOutput $DumpOut `
-RedirectStandardError $DumpErr
Start-Sleep -Seconds 2
try {
$Process = Start-Process -FilePath $Frida `
-ArgumentList $FridaArguments `
-Wait `
-PassThru `
-NoNewWindow `
-RedirectStandardOutput $FridaOut `
-RedirectStandardError $FridaErr
"exit_code=$($Process.ExitCode)" | Out-File -Encoding UTF8 (Join-Path $CaptureDir 'frida-exit.txt')
}
finally {
if (-not $Dump.HasExited) {
Stop-Process -Id $Dump.Id -Force -ErrorAction SilentlyContinue
}
Wait-Process -Id $Dump.Id -Timeout 10 -ErrorAction SilentlyContinue | Out-Null
}
Get-ChildItem $CaptureDir | Select-Object Name, Length, LastWriteTime
+45
View File
@@ -0,0 +1,45 @@
param(
[Parameter(Mandatory = $true)]
[string]$Name,
[Parameter(Mandatory = $true)]
[string[]]$HarnessArgs
)
$ErrorActionPreference = 'Stop'
$Root = Resolve-Path (Join-Path $PSScriptRoot '..\..')
$Harness = Join-Path $Root 'src\MxTraceHarness\bin\Release\net481\MxTraceHarness.exe'
$Script = Join-Path $Root 'analysis\frida\mx-nmx-trace.js'
$TraceDir = Join-Path $Root "captures\$Name"
New-Item -ItemType Directory -Force $TraceDir | Out-Null
$HarnessLog = Join-Path $TraceDir 'harness.log'
$FridaOut = Join-Path $TraceDir 'frida.stdout.jsonl'
$FridaErr = Join-Path $TraceDir 'frida.stderr.txt'
$CommandFile = Join-Path $TraceDir 'frida-command.txt'
if (Test-Path $HarnessLog) { Remove-Item -LiteralPath $HarnessLog -Force }
if (Test-Path $FridaOut) { Remove-Item -LiteralPath $FridaOut -Force }
if (Test-Path $FridaErr) { Remove-Item -LiteralPath $FridaErr -Force }
$FullHarnessArgs = @($HarnessArgs + @("--log=$HarnessLog", "--client=MxFridaTrace-$Name"))
$Frida = (Get-Command frida.exe -ErrorAction Stop).Source
$ArgumentList = @('-f', $Harness, '-l', $Script, '--') + $FullHarnessArgs
"frida=$Frida" | Out-File -Encoding UTF8 $CommandFile
"harness=$Harness" | Out-File -Encoding UTF8 -Append $CommandFile
("args=" + ($ArgumentList -join ' ')) | Out-File -Encoding UTF8 -Append $CommandFile
$Process = Start-Process -FilePath $Frida `
-ArgumentList $ArgumentList `
-Wait `
-PassThru `
-NoNewWindow `
-RedirectStandardOutput $FridaOut `
-RedirectStandardError $FridaErr
"exit_code=$($Process.ExitCode)" | Out-File -Encoding UTF8 (Join-Path $TraceDir 'frida-exit.txt')
Get-ChildItem $TraceDir | Select-Object Name, Length, LastWriteTime
+59
View File
@@ -0,0 +1,59 @@
param(
[Parameter(Mandatory = $true)]
[string]$Name,
[Parameter(Mandatory = $true)]
[string[]]$HarnessArgs
)
$ErrorActionPreference = 'Stop'
$Root = Resolve-Path (Join-Path $PSScriptRoot '..\..')
$Dumpcap = 'C:\Program Files\Wireshark\dumpcap.exe'
$Harness = Join-Path $Root 'src\MxTraceHarness\bin\Release\net481\MxTraceHarness.exe'
$CaptureDir = Join-Path $Root "captures\$Name"
New-Item -ItemType Directory -Force $CaptureDir | Out-Null
$Pcap = Join-Path $CaptureDir 'loopback.pcapng'
$Log = Join-Path $CaptureDir 'harness.log'
$DumpOut = Join-Path $CaptureDir 'dumpcap.stdout.txt'
$DumpErr = Join-Path $CaptureDir 'dumpcap.stderr.txt'
if (Test-Path $Pcap) { Remove-Item -LiteralPath $Pcap -Force }
if (Test-Path $Log) { Remove-Item -LiteralPath $Log -Force }
$FullHarnessArgs = @($HarnessArgs + @("--log=$Log", "--client=MxProtoTraceHarness-$Name"))
"harness=$Harness" | Out-File -Encoding UTF8 (Join-Path $CaptureDir 'command.txt')
("args=" + ($FullHarnessArgs -join ' ')) | Out-File -Encoding UTF8 -Append (Join-Path $CaptureDir 'command.txt')
$Dump = Start-Process -FilePath $Dumpcap `
-ArgumentList @('-i', '\Device\NPF_Loopback', '-w', $Pcap, '-q') `
-PassThru `
-WindowStyle Hidden `
-RedirectStandardOutput $DumpOut `
-RedirectStandardError $DumpErr
Start-Sleep -Seconds 2
try {
$Process = Start-Process -FilePath $Harness `
-ArgumentList $FullHarnessArgs `
-Wait `
-PassThru `
-NoNewWindow `
-RedirectStandardOutput (Join-Path $CaptureDir 'stdout.txt') `
-RedirectStandardError (Join-Path $CaptureDir 'stderr.txt')
"exit_code=$($Process.ExitCode)" | Out-File -Encoding UTF8 (Join-Path $CaptureDir 'exit.txt')
}
finally {
if (-not $Dump.HasExited) {
Stop-Process -Id $Dump.Id -Force -ErrorAction SilentlyContinue
}
Wait-Process -Id $Dump.Id -Timeout 10 -ErrorAction SilentlyContinue | Out-Null
}
Get-ChildItem $CaptureDir | Select-Object Name, Length, LastWriteTime
@@ -0,0 +1,102 @@
param(
[Parameter(Mandatory = $true)]
[string]$Name,
[Parameter(Mandatory = $true)]
[string[]]$HarnessArgs,
[switch]$NoLoopback
)
$ErrorActionPreference = 'Stop'
$Root = Resolve-Path (Join-Path $PSScriptRoot '..\..')
$Dumpcap = 'C:\Program Files\Wireshark\dumpcap.exe'
$Harness = Join-Path $Root 'src\MxTraceHarness\bin\Release\net481\MxTraceHarness.exe'
$ClientFridaScript = Join-Path $Root 'analysis\frida\mx-nmx-trace.js'
$ServiceFridaScript = Join-Path $Root 'analysis\frida\nmxsvc-trace.js'
$CaptureDir = Join-Path $Root "captures\$Name"
New-Item -ItemType Directory -Force $CaptureDir | Out-Null
$Pcap = Join-Path $CaptureDir 'loopback.pcapng'
$HarnessLog = Join-Path $CaptureDir 'harness.log'
$ClientFridaOut = Join-Path $CaptureDir 'client-frida.stdout.jsonl'
$ClientFridaErr = Join-Path $CaptureDir 'client-frida.stderr.txt'
$ServiceFridaOut = Join-Path $CaptureDir 'service-frida.stdout.jsonl'
$ServiceFridaErr = Join-Path $CaptureDir 'service-frida.stderr.txt'
$DumpOut = Join-Path $CaptureDir 'dumpcap.stdout.txt'
$DumpErr = Join-Path $CaptureDir 'dumpcap.stderr.txt'
$CommandFile = Join-Path $CaptureDir 'command.txt'
foreach ($Path in @($Pcap, $HarnessLog, $ClientFridaOut, $ClientFridaErr, $ServiceFridaOut, $ServiceFridaErr, $DumpOut, $DumpErr)) {
if (Test-Path $Path) { Remove-Item -LiteralPath $Path -Force }
}
$NmxSvc = Get-Process NmxSvc -ErrorAction Stop | Select-Object -First 1
$Frida = (Get-Command frida.exe -ErrorAction Stop).Source
$FullHarnessArgs = @($HarnessArgs + @("--log=$HarnessLog", "--client=MxBoundary-$Name"))
$ClientFridaArguments = @('-f', $Harness, '-l', $ClientFridaScript, '--') + $FullHarnessArgs
$ServiceFridaArguments = @('-p', "$($NmxSvc.Id)", '-l', $ServiceFridaScript)
"nmxsvc_pid=$($NmxSvc.Id)" | Out-File -Encoding UTF8 $CommandFile
"nmxsvc_path=$($NmxSvc.Path)" | Out-File -Encoding UTF8 -Append $CommandFile
"dumpcap=$Dumpcap" | Out-File -Encoding UTF8 -Append $CommandFile
"frida=$Frida" | Out-File -Encoding UTF8 -Append $CommandFile
"harness=$Harness" | Out-File -Encoding UTF8 -Append $CommandFile
("service_args=" + ($ServiceFridaArguments -join ' ')) | Out-File -Encoding UTF8 -Append $CommandFile
("client_args=" + ($ClientFridaArguments -join ' ')) | Out-File -Encoding UTF8 -Append $CommandFile
$Dump = $null
$ServiceFrida = $null
try {
$ServiceFrida = Start-Process -FilePath $Frida `
-ArgumentList $ServiceFridaArguments `
-PassThru `
-WindowStyle Hidden `
-RedirectStandardOutput $ServiceFridaOut `
-RedirectStandardError $ServiceFridaErr
Start-Sleep -Seconds 3
if (-not $NoLoopback -and (Test-Path $Dumpcap)) {
$Dump = Start-Process -FilePath $Dumpcap `
-ArgumentList @('-i', '\Device\NPF_Loopback', '-w', $Pcap, '-q') `
-PassThru `
-WindowStyle Hidden `
-RedirectStandardOutput $DumpOut `
-RedirectStandardError $DumpErr
Start-Sleep -Seconds 2
}
$ClientFrida = Start-Process -FilePath $Frida `
-ArgumentList $ClientFridaArguments `
-Wait `
-PassThru `
-NoNewWindow `
-RedirectStandardOutput $ClientFridaOut `
-RedirectStandardError $ClientFridaErr
"client_exit_code=$($ClientFrida.ExitCode)" | Out-File -Encoding UTF8 (Join-Path $CaptureDir 'client-frida-exit.txt')
}
finally {
if ($ServiceFrida -ne $null -and -not $ServiceFrida.HasExited) {
Stop-Process -Id $ServiceFrida.Id -Force -ErrorAction SilentlyContinue
}
if ($Dump -ne $null -and -not $Dump.HasExited) {
Stop-Process -Id $Dump.Id -Force -ErrorAction SilentlyContinue
}
if ($ServiceFrida -ne $null) {
Wait-Process -Id $ServiceFrida.Id -Timeout 10 -ErrorAction SilentlyContinue | Out-Null
}
if ($Dump -ne $null) {
Wait-Process -Id $Dump.Id -Timeout 10 -ErrorAction SilentlyContinue | Out-Null
}
}
Get-ChildItem $CaptureDir | Select-Object Name, Length, LastWriteTime
+26
View File
@@ -0,0 +1,26 @@
param(
[string]$UserName = "dohertj2",
[string]$Domain = $env:COMPUTERNAME
)
$securePassword = Read-Host -Prompt "MX RPC password" -AsSecureString
$bstr = [Runtime.InteropServices.Marshal]::SecureStringToBSTR($securePassword)
try {
$plainPassword = [Runtime.InteropServices.Marshal]::PtrToStringBSTR($bstr)
[Environment]::SetEnvironmentVariable("MX_RPC_USER", $UserName, "User")
[Environment]::SetEnvironmentVariable("MX_RPC_DOMAIN", $Domain, "User")
[Environment]::SetEnvironmentVariable("MX_RPC_PASSWORD", $plainPassword, "User")
$env:MX_RPC_USER = $UserName
$env:MX_RPC_DOMAIN = $Domain
$env:MX_RPC_PASSWORD = $plainPassword
Write-Host "MX_RPC_USER and MX_RPC_DOMAIN set for user environment and current shell."
Write-Host "MX_RPC_PASSWORD set for user environment and current shell."
Write-Host "Start a new Codex/shell session if a child process does not inherit the updated user environment."
}
finally {
if ($bstr -ne [IntPtr]::Zero) {
[Runtime.InteropServices.Marshal]::ZeroFreeBSTR($bstr)
}
}
+74
View File
@@ -0,0 +1,74 @@
from __future__ import annotations
import argparse
import csv
from collections import Counter
from pathlib import Path
HEADER = [
"capture",
"stream",
"packet_type",
"context_id",
"opnum",
"count",
"frag_lengths",
]
def summarize(path: Path) -> list[list[str]]:
rows: list[list[str]] = []
counts: Counter[tuple[str, str, str, str]] = Counter()
lengths: dict[tuple[str, str, str, str], Counter[str]] = {}
with path.open("r", encoding="utf-8-sig", newline="") as handle:
reader = csv.reader(handle, delimiter="\t")
for fields in reader:
if len(fields) < 10:
continue
stream = fields[2]
packet_type = fields[3]
context_id = fields[5]
opnum = fields[6]
frag_len = fields[8]
key = (stream, packet_type, context_id, opnum)
counts[key] += 1
lengths.setdefault(key, Counter())[frag_len] += 1
for key, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])):
stream, packet_type, context_id, opnum = key
frag_lengths = ",".join(
f"{length}:{length_count}"
for length, length_count in sorted(lengths[key].items(), key=lambda item: (item[0], item[1]))
)
rows.append([path.parent.name, stream, packet_type, context_id, opnum, str(count), frag_lengths])
return rows
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("dcerpc_tsv", type=Path, nargs="+")
parser.add_argument("--out", type=Path)
args = parser.parse_args()
output_rows = [HEADER]
for path in args.dcerpc_tsv:
output_rows.extend(summarize(path))
if args.out:
args.out.parent.mkdir(parents=True, exist_ok=True)
with args.out.open("w", encoding="utf-8", newline="") as handle:
writer = csv.writer(handle, delimiter="\t", lineterminator="\n")
writer.writerows(output_rows)
else:
writer = csv.writer(__import__("sys").stdout, delimiter="\t", lineterminator="\n")
writer.writerows(output_rows)
return 0
if __name__ == "__main__":
raise SystemExit(main())
+102
View File
@@ -0,0 +1,102 @@
from __future__ import annotations
import argparse
from collections import Counter, defaultdict
from pathlib import Path
from scapy.all import IP, IPv6, TCP, UDP, Raw, rdpcap
def packet_endpoint(packet):
if IP in packet:
network = packet[IP]
elif IPv6 in packet:
network = packet[IPv6]
else:
return None
if TCP in packet:
tcp = packet[TCP]
proto = "TCP"
sport = int(tcp.sport)
dport = int(tcp.dport)
elif UDP in packet:
udp = packet[UDP]
proto = "UDP"
sport = int(udp.sport)
dport = int(udp.dport)
else:
return None
payload_len = len(bytes(packet[Raw].load)) if Raw in packet else 0
return proto, str(network.src), sport, str(network.dst), dport, payload_len
def summarize(path: Path) -> list[str]:
packets = rdpcap(str(path))
endpoint_counts: Counter[tuple[str, str, int, str, int]] = Counter()
endpoint_payloads: defaultdict[tuple[str, str, int, str, int], list[int]] = defaultdict(list)
port_counts: Counter[tuple[str, int]] = Counter()
raw_payload_counter = 0
for packet in packets:
endpoint = packet_endpoint(packet)
if endpoint is None:
continue
proto, src, sport, dst, dport, payload_len = endpoint
key = (proto, src, sport, dst, dport)
endpoint_counts[key] += 1
endpoint_payloads[key].append(payload_len)
port_counts[(proto, sport)] += 1
port_counts[(proto, dport)] += 1
if payload_len:
raw_payload_counter += 1
lines = [
f"capture\t{path}",
f"packets\t{len(packets)}",
f"ip_tcp_udp_packets\t{sum(endpoint_counts.values())}",
f"packets_with_payload\t{raw_payload_counter}",
"",
"top_ports",
"proto\tport\tpacket_refs",
]
for (proto, port), count in port_counts.most_common(20):
lines.append(f"{proto}\t{port}\t{count}")
lines.extend(["", "top_endpoints", "proto\tsrc\tsport\tdst\tdport\tpackets\tpayload_packets\tpayload_bytes"])
for key, count in endpoint_counts.most_common(30):
payloads = endpoint_payloads[key]
payload_packets = sum(1 for length in payloads if length)
payload_bytes = sum(payloads)
proto, src, sport, dst, dport = key
lines.append(f"{proto}\t{src}\t{sport}\t{dst}\t{dport}\t{count}\t{payload_packets}\t{payload_bytes}")
return lines
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("pcapng", nargs="+", type=Path)
parser.add_argument("--out", type=Path)
args = parser.parse_args()
output: list[str] = []
for index, pcapng in enumerate(args.pcapng):
if index:
output.append("")
output.append("=" * 80)
output.append("")
output.extend(summarize(pcapng))
text = "\n".join(output) + "\n"
if args.out:
args.out.parent.mkdir(parents=True, exist_ok=True)
args.out.write_text(text, encoding="utf-8")
else:
print(text, end="")
return 0
if __name__ == "__main__":
raise SystemExit(main())