fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Layout:
- src/ .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
MxAsbClient, probes, tests, harnesses. Executable spec.
- design/ Architectural plan for the Rust port (M0–M6), error
model, protocol invariants, risks (R1–R16), adversarial
review log (review.md).
- rust/ Rust workspace. M0 skeleton + M1 codec parity.
mxaccess-codec: 215 unit tests + 2 cross-implementation
parity tests (byte-identical against .NET reference).
Other crates are M0 stubs awaiting M2+.
- captures/ Frida + netsh + pcap evidence per CLAUDE.md
("captures are evidence, not throwaway logs").
- analysis/ Decompiled C# (frida/proxy/decompiled-*),
Ghidra exports for native DLLs (`exports/` only —
working state at `projects/` and AVEVA's input
binaries at `input/` are gitignored).
- docs/ Reverse-engineering reference docs.
- tools/ Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/ Rust CI: fmt + build + test + clippy on Windows.
- LICENSE MIT (Joseph Doherty, 2026).
Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly
Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
212 lines
6.3 KiB
Python
212 lines
6.3 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import re
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
import pefile
|
|
|
|
|
|
ASCII_RE = re.compile(rb"[\x20-\x7e]{5,}")
|
|
UTF16_RE = re.compile(rb"(?:[\x20-\x7e]\x00){5,}")
|
|
|
|
|
|
def read_c_string(data: bytes, offset: int, limit: int = 128) -> str:
|
|
chunk = data[offset:offset + limit]
|
|
end = chunk.find(b"\x00")
|
|
if end >= 0:
|
|
chunk = chunk[:end]
|
|
return chunk.decode("ascii", errors="replace")
|
|
|
|
|
|
def timestamp_text(value: int) -> str:
|
|
try:
|
|
return dt.datetime.fromtimestamp(value, tz=dt.timezone.utc).isoformat()
|
|
except Exception:
|
|
return str(value)
|
|
|
|
|
|
def hex_context(data: bytes, offset: int, size: int) -> str:
|
|
start = max(0, offset - size)
|
|
end = min(len(data), offset + size)
|
|
return data[start:end].hex(" ")
|
|
|
|
|
|
def collect_strings(data: bytes, needles: list[str]) -> list[str]:
|
|
lowered = [needle.lower() for needle in needles]
|
|
matches: list[str] = []
|
|
|
|
for raw in ASCII_RE.findall(data):
|
|
text = raw.decode("ascii", errors="replace")
|
|
if any(needle in text.lower() for needle in lowered):
|
|
matches.append(text)
|
|
|
|
for raw in UTF16_RE.findall(data):
|
|
text = raw.decode("utf-16le", errors="replace")
|
|
if any(needle in text.lower() for needle in lowered):
|
|
matches.append(text)
|
|
|
|
return sorted(set(matches))
|
|
|
|
|
|
def format_exports(pe: pefile.PE) -> list[str]:
|
|
if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
|
|
return ["(none)"]
|
|
|
|
rows = ["| Ordinal | RVA | Name |", "| ---: | ---: | --- |"]
|
|
for symbol in pe.DIRECTORY_ENTRY_EXPORT.symbols:
|
|
name = symbol.name.decode("utf-8", errors="replace") if symbol.name else ""
|
|
rows.append(f"| {symbol.ordinal} | 0x{symbol.address:08x} | `{name}` |")
|
|
return rows
|
|
|
|
|
|
def format_imports(pe: pefile.PE) -> list[str]:
|
|
if not hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
|
|
return ["(none)"]
|
|
|
|
lines: list[str] = []
|
|
for entry in pe.DIRECTORY_ENTRY_IMPORT:
|
|
dll = entry.dll.decode("utf-8", errors="replace")
|
|
names: list[str] = []
|
|
for imp in entry.imports:
|
|
if imp.name:
|
|
names.append(imp.name.decode("utf-8", errors="replace"))
|
|
else:
|
|
names.append(f"ord_{imp.ordinal}")
|
|
lines.append(f"- `{dll}`: " + ", ".join(f"`{name}`" for name in names))
|
|
return lines
|
|
|
|
|
|
def format_resources(pe: pefile.PE) -> list[str]:
|
|
if not hasattr(pe, "DIRECTORY_ENTRY_RESOURCE"):
|
|
return ["(none)"]
|
|
|
|
rows = ["| Type | ID/name | Lang | RVA | Size |", "| --- | --- | ---: | ---: | ---: |"]
|
|
for type_entry in pe.DIRECTORY_ENTRY_RESOURCE.entries:
|
|
type_name = str(type_entry.name) if type_entry.name is not None else str(type_entry.struct.Id)
|
|
if not hasattr(type_entry, "directory"):
|
|
continue
|
|
for name_entry in type_entry.directory.entries:
|
|
name = str(name_entry.name) if name_entry.name is not None else str(name_entry.struct.Id)
|
|
if not hasattr(name_entry, "directory"):
|
|
continue
|
|
for lang_entry in name_entry.directory.entries:
|
|
data = lang_entry.data.struct
|
|
rows.append(
|
|
f"| `{type_name}` | `{name}` | {lang_entry.struct.Id} | "
|
|
f"0x{data.OffsetToData:08x} | {data.Size} |"
|
|
)
|
|
return rows
|
|
|
|
|
|
def guid_hits(data: bytes, guid_text: str) -> list[str]:
|
|
value = uuid.UUID(guid_text)
|
|
patterns = [
|
|
("text-lower", str(value).encode("ascii")),
|
|
("text-upper", str(value).upper().encode("ascii")),
|
|
("utf16-lower", str(value).encode("utf-16le")),
|
|
("utf16-upper", str(value).upper().encode("utf-16le")),
|
|
("guid-bytes-le", value.bytes_le),
|
|
("guid-bytes-be", value.bytes),
|
|
]
|
|
|
|
lines: list[str] = []
|
|
for label, pattern in patterns:
|
|
start = 0
|
|
while True:
|
|
offset = data.find(pattern, start)
|
|
if offset < 0:
|
|
break
|
|
lines.append(
|
|
f"- `{guid_text}` `{label}` at file offset `0x{offset:08x}`: "
|
|
f"`{hex_context(data, offset, 32)}`"
|
|
)
|
|
start = offset + 1
|
|
return lines
|
|
|
|
|
|
def report_one(path: Path, guids: list[str]) -> str:
|
|
data = path.read_bytes()
|
|
pe = pefile.PE(str(path), fast_load=False)
|
|
pe.parse_data_directories()
|
|
|
|
machine = pe.FILE_HEADER.Machine
|
|
bitness = "x64" if machine == 0x8664 else "x86" if machine == 0x14c else f"machine 0x{machine:04x}"
|
|
lines = [
|
|
f"# {path.name}",
|
|
"",
|
|
f"- Path: `{path}`",
|
|
f"- Size: {len(data)} bytes",
|
|
f"- Machine: {bitness}",
|
|
f"- PE timestamp: {timestamp_text(pe.FILE_HEADER.TimeDateStamp)}",
|
|
f"- ImageBase: `0x{pe.OPTIONAL_HEADER.ImageBase:08x}`",
|
|
"",
|
|
"## Exports",
|
|
"",
|
|
*format_exports(pe),
|
|
"",
|
|
"## Imports",
|
|
"",
|
|
*format_imports(pe),
|
|
"",
|
|
"## Resources",
|
|
"",
|
|
*format_resources(pe),
|
|
"",
|
|
"## GUID hits",
|
|
"",
|
|
]
|
|
|
|
hit_lines: list[str] = []
|
|
for guid in guids:
|
|
hit_lines.extend(guid_hits(data, guid))
|
|
lines.extend(hit_lines or ["(none)"])
|
|
|
|
needles = [
|
|
"ndr",
|
|
"proxy",
|
|
"stub",
|
|
"rpc",
|
|
"interface",
|
|
"nmx",
|
|
"lmx",
|
|
"putrequest",
|
|
"getresponse",
|
|
"registeritems",
|
|
"write",
|
|
*guids,
|
|
]
|
|
strings = collect_strings(data, needles)
|
|
lines.extend(["", "## Interesting strings", ""])
|
|
lines.extend(f"- `{text}`" for text in strings[:200])
|
|
if len(strings) > 200:
|
|
lines.append(f"- ... {len(strings) - 200} more")
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("pe", type=Path, nargs="+")
|
|
parser.add_argument("--guid", action="append", default=[])
|
|
parser.add_argument("--out-dir", type=Path)
|
|
args = parser.parse_args()
|
|
|
|
if args.out_dir:
|
|
args.out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
for path in args.pe:
|
|
report = report_one(path, args.guid)
|
|
if args.out_dir:
|
|
(args.out_dir / f"{path.name}.md").write_text(report, encoding="utf-8")
|
|
else:
|
|
print(report)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|