Files
mxaccess/analysis/scripts/extract_frida_trace.py
T
Joseph Doherty fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Initial project state: .NET reference, design, Rust port (M0+M1), evidence
Layout:
- src/                    .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
                          MxAsbClient, probes, tests, harnesses. Executable spec.
- design/                 Architectural plan for the Rust port (M0–M6), error
                          model, protocol invariants, risks (R1–R16), adversarial
                          review log (review.md).
- rust/                   Rust workspace. M0 skeleton + M1 codec parity.
                          mxaccess-codec: 215 unit tests + 2 cross-implementation
                          parity tests (byte-identical against .NET reference).
                          Other crates are M0 stubs awaiting M2+.
- captures/               Frida + netsh + pcap evidence per CLAUDE.md
                          ("captures are evidence, not throwaway logs").
- analysis/               Decompiled C# (frida/proxy/decompiled-*),
                          Ghidra exports for native DLLs (`exports/` only —
                          working state at `projects/` and AVEVA's input
                          binaries at `input/` are gitignored).
- docs/                   Reverse-engineering reference docs.
- tools/                  Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
                          Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/      Rust CI: fmt + build + test + clippy on Windows.
- LICENSE                 MIT (Joseph Doherty, 2026).

Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly

Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 06:21:00 -04:00

201 lines
7.8 KiB
Python

from __future__ import annotations
import argparse
import csv
import datetime as dt
import json
import re
import struct
from pathlib import Path
JSON_RE = re.compile(r"(\{.*\})")
HARNESS_RE = re.compile(r"^(?P<timestamp>\S+)\t(?P<event>[^\t]+)\t(?P<payload>.*)$")
def iter_json_events(path: Path):
for line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
match = JSON_RE.search(line)
if not match:
continue
try:
yield json.loads(match.group(1))
except json.JSONDecodeError:
continue
def harness_write_values(path: Path) -> tuple[str, list[str]]:
write_type = ""
values: list[str] = []
for line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
match = HARNESS_RE.match(line)
if not match:
continue
payload = json.loads(match.group("payload"))
if match.group("event") == "harness.start":
write_type = str(payload.get("WriteType", ""))
continue
if match.group("event") != "mx.write.begin":
continue
value = payload.get("Value", {}).get("Value")
if value is not None:
values.append(str(value))
return write_type.lower(), values
def bytes_from_hex(text: str) -> bytes:
if not text:
return b""
return bytes.fromhex(text)
def value_needles(write_type: str, values: list[str]) -> list[tuple[str, bytes]]:
needles: list[tuple[str, bytes]] = []
for value in values:
if write_type in {"int", "integer", "int32"}:
needles.append((value, struct.pack("<i", int(value))))
elif write_type in {"bool", "boolean"}:
normalized = value.lower()
if normalized == "true":
needles.append((value + ":u8", b"\x01"))
needles.append((value + ":i32", struct.pack("<i", 1)))
needles.append((value + ":variant_bool", struct.pack("<h", -1)))
else:
needles.append((value + ":u8", b"\x00"))
needles.append((value + ":i32", struct.pack("<i", 0)))
needles.append((value + ":variant_bool", struct.pack("<h", 0)))
elif write_type in {"float", "single"}:
needles.append((value + ":f32", struct.pack("<f", float(value))))
elif write_type == "double":
needles.append((value + ":f64", struct.pack("<d", float(value))))
elif write_type == "string":
needles.append((value + ":utf16le", value.encode("utf-16le")))
needles.append((value + ":utf8", value.encode("utf-8")))
elif write_type in {"datetime", "time"}:
parsed = parse_datetime(value)
if parsed.tzinfo is None:
local_parsed = parsed
parsed = parsed.replace(tzinfo=dt.datetime.now().astimezone().tzinfo)
else:
local_parsed = parsed.replace(tzinfo=None)
utc = parsed.astimezone(dt.timezone.utc)
ole_epoch = dt.datetime(1899, 12, 30, tzinfo=dt.timezone.utc)
ole_days = (utc - ole_epoch).total_seconds() / 86400.0
ole_local_epoch = dt.datetime(1899, 12, 30)
ole_local_days = (local_parsed - ole_local_epoch).total_seconds() / 86400.0
filetime_epoch = dt.datetime(1601, 1, 1, tzinfo=dt.timezone.utc)
filetime_ticks = int((utc - filetime_epoch).total_seconds() * 10_000_000)
needles.append((value + ":oadate", struct.pack("<d", ole_days)))
needles.append((value + ":oadate-local", struct.pack("<d", ole_local_days)))
needles.append((value + ":filetime", struct.pack("<Q", filetime_ticks)))
needles.append((value + ":utf16le", value.encode("utf-16le")))
date_texts = {
local_parsed.strftime("%m/%d/%Y %H:%M:%S"),
local_parsed.strftime("%#m/%#d/%Y %#I:%M:%S %p") if hasattr(local_parsed, "strftime") else "",
}
for text in date_texts:
if text:
needles.append((text + ":utf16le", text.encode("utf-16le")))
else:
needles.append((value + ":utf16le", value.encode("utf-16le")))
needles.append((value + ":utf8", value.encode("utf-8")))
return [(label, needle) for label, needle in needles if needle]
def parse_datetime(value: str) -> dt.datetime:
try:
return dt.datetime.fromisoformat(value)
except ValueError:
pass
for fmt in ("%m/%d/%Y %H:%M:%S", "%m/%d/%Y %I:%M:%S %p"):
try:
return dt.datetime.strptime(value, fmt)
except ValueError:
continue
raise ValueError(f"Unsupported datetime value {value!r}")
def value_hits(data: bytes, needles: list[tuple[str, bytes]]) -> str:
hits: list[str] = []
for label, needle in needles:
start = 0
while True:
offset = data.find(needle, start)
if offset < 0:
break
hits.append(f"{label}@{offset}")
start = offset + 1
return " ".join(hits)
def write_event_rows(events: list[dict], needles: list[tuple[str, bytes]], out: Path) -> None:
out.parent.mkdir(parents=True, exist_ok=True)
header = [
"time",
"event",
"module",
"name",
"ecx",
"retval",
"args",
"candidate_index",
"candidate_size",
"candidate_ptr",
"value_hits",
"hex",
]
with out.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n")
writer.writeheader()
for event in events:
candidates = event.get("candidates") or [None]
for index, candidate in enumerate(candidates):
data = b""
if candidate:
data = bytes_from_hex(candidate.get("hex", ""))
writer.writerow({
"time": event.get("time", ""),
"event": event.get("event", ""),
"module": event.get("module", ""),
"name": event.get("name", ""),
"ecx": event.get("ecx", ""),
"retval": event.get("retval", ""),
"args": json.dumps(event.get("args", []), separators=(",", ":")),
"candidate_index": "" if candidate is None else str(index),
"candidate_size": "" if candidate is None else str(candidate.get("size", "")),
"candidate_ptr": "" if candidate is None else candidate.get("ptr", ""),
"value_hits": value_hits(data, needles),
"hex": "" if candidate is None else candidate.get("hex", ""),
})
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("capture_dir", type=Path)
parser.add_argument("--out", type=Path)
parser.add_argument("--frida-log", type=Path)
args = parser.parse_args()
capture_dir = args.capture_dir
write_type, values = harness_write_values(capture_dir / "harness.log")
needles = value_needles(write_type, values)
frida_log = args.frida_log
if frida_log is None:
candidates = [
capture_dir / "frida.stdout.jsonl",
capture_dir / "client-frida.stdout.jsonl",
capture_dir / "service-frida.stdout.jsonl",
]
frida_log = next((path for path in candidates if path.exists()), candidates[0])
events = list(iter_json_events(frida_log))
out = args.out or (capture_dir / "frida-events.tsv")
write_event_rows(events, needles, out)
print(f"wrote {out}")
print(f"frida_log={frida_log}")
print(f"events={len(events)} write_type={write_type} write_values={','.join(values)} needles={len(needles)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())