fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Layout:
- src/ .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
MxAsbClient, probes, tests, harnesses. Executable spec.
- design/ Architectural plan for the Rust port (M0–M6), error
model, protocol invariants, risks (R1–R16), adversarial
review log (review.md).
- rust/ Rust workspace. M0 skeleton + M1 codec parity.
mxaccess-codec: 215 unit tests + 2 cross-implementation
parity tests (byte-identical against .NET reference).
Other crates are M0 stubs awaiting M2+.
- captures/ Frida + netsh + pcap evidence per CLAUDE.md
("captures are evidence, not throwaway logs").
- analysis/ Decompiled C# (frida/proxy/decompiled-*),
Ghidra exports for native DLLs (`exports/` only —
working state at `projects/` and AVEVA's input
binaries at `input/` are gitignored).
- docs/ Reverse-engineering reference docs.
- tools/ Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/ Rust CI: fmt + build + test + clippy on Windows.
- LICENSE MIT (Joseph Doherty, 2026).
Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly
Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
201 lines
7.8 KiB
Python
201 lines
7.8 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import datetime as dt
|
|
import json
|
|
import re
|
|
import struct
|
|
from pathlib import Path
|
|
|
|
|
|
JSON_RE = re.compile(r"(\{.*\})")
|
|
HARNESS_RE = re.compile(r"^(?P<timestamp>\S+)\t(?P<event>[^\t]+)\t(?P<payload>.*)$")
|
|
|
|
|
|
def iter_json_events(path: Path):
|
|
for line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
match = JSON_RE.search(line)
|
|
if not match:
|
|
continue
|
|
try:
|
|
yield json.loads(match.group(1))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
|
|
def harness_write_values(path: Path) -> tuple[str, list[str]]:
|
|
write_type = ""
|
|
values: list[str] = []
|
|
for line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
match = HARNESS_RE.match(line)
|
|
if not match:
|
|
continue
|
|
payload = json.loads(match.group("payload"))
|
|
if match.group("event") == "harness.start":
|
|
write_type = str(payload.get("WriteType", ""))
|
|
continue
|
|
if match.group("event") != "mx.write.begin":
|
|
continue
|
|
value = payload.get("Value", {}).get("Value")
|
|
if value is not None:
|
|
values.append(str(value))
|
|
return write_type.lower(), values
|
|
|
|
|
|
def bytes_from_hex(text: str) -> bytes:
|
|
if not text:
|
|
return b""
|
|
return bytes.fromhex(text)
|
|
|
|
|
|
def value_needles(write_type: str, values: list[str]) -> list[tuple[str, bytes]]:
|
|
needles: list[tuple[str, bytes]] = []
|
|
for value in values:
|
|
if write_type in {"int", "integer", "int32"}:
|
|
needles.append((value, struct.pack("<i", int(value))))
|
|
elif write_type in {"bool", "boolean"}:
|
|
normalized = value.lower()
|
|
if normalized == "true":
|
|
needles.append((value + ":u8", b"\x01"))
|
|
needles.append((value + ":i32", struct.pack("<i", 1)))
|
|
needles.append((value + ":variant_bool", struct.pack("<h", -1)))
|
|
else:
|
|
needles.append((value + ":u8", b"\x00"))
|
|
needles.append((value + ":i32", struct.pack("<i", 0)))
|
|
needles.append((value + ":variant_bool", struct.pack("<h", 0)))
|
|
elif write_type in {"float", "single"}:
|
|
needles.append((value + ":f32", struct.pack("<f", float(value))))
|
|
elif write_type == "double":
|
|
needles.append((value + ":f64", struct.pack("<d", float(value))))
|
|
elif write_type == "string":
|
|
needles.append((value + ":utf16le", value.encode("utf-16le")))
|
|
needles.append((value + ":utf8", value.encode("utf-8")))
|
|
elif write_type in {"datetime", "time"}:
|
|
parsed = parse_datetime(value)
|
|
if parsed.tzinfo is None:
|
|
local_parsed = parsed
|
|
parsed = parsed.replace(tzinfo=dt.datetime.now().astimezone().tzinfo)
|
|
else:
|
|
local_parsed = parsed.replace(tzinfo=None)
|
|
utc = parsed.astimezone(dt.timezone.utc)
|
|
ole_epoch = dt.datetime(1899, 12, 30, tzinfo=dt.timezone.utc)
|
|
ole_days = (utc - ole_epoch).total_seconds() / 86400.0
|
|
ole_local_epoch = dt.datetime(1899, 12, 30)
|
|
ole_local_days = (local_parsed - ole_local_epoch).total_seconds() / 86400.0
|
|
filetime_epoch = dt.datetime(1601, 1, 1, tzinfo=dt.timezone.utc)
|
|
filetime_ticks = int((utc - filetime_epoch).total_seconds() * 10_000_000)
|
|
needles.append((value + ":oadate", struct.pack("<d", ole_days)))
|
|
needles.append((value + ":oadate-local", struct.pack("<d", ole_local_days)))
|
|
needles.append((value + ":filetime", struct.pack("<Q", filetime_ticks)))
|
|
needles.append((value + ":utf16le", value.encode("utf-16le")))
|
|
date_texts = {
|
|
local_parsed.strftime("%m/%d/%Y %H:%M:%S"),
|
|
local_parsed.strftime("%#m/%#d/%Y %#I:%M:%S %p") if hasattr(local_parsed, "strftime") else "",
|
|
}
|
|
for text in date_texts:
|
|
if text:
|
|
needles.append((text + ":utf16le", text.encode("utf-16le")))
|
|
else:
|
|
needles.append((value + ":utf16le", value.encode("utf-16le")))
|
|
needles.append((value + ":utf8", value.encode("utf-8")))
|
|
return [(label, needle) for label, needle in needles if needle]
|
|
|
|
|
|
def parse_datetime(value: str) -> dt.datetime:
|
|
try:
|
|
return dt.datetime.fromisoformat(value)
|
|
except ValueError:
|
|
pass
|
|
for fmt in ("%m/%d/%Y %H:%M:%S", "%m/%d/%Y %I:%M:%S %p"):
|
|
try:
|
|
return dt.datetime.strptime(value, fmt)
|
|
except ValueError:
|
|
continue
|
|
raise ValueError(f"Unsupported datetime value {value!r}")
|
|
|
|
|
|
def value_hits(data: bytes, needles: list[tuple[str, bytes]]) -> str:
|
|
hits: list[str] = []
|
|
for label, needle in needles:
|
|
start = 0
|
|
while True:
|
|
offset = data.find(needle, start)
|
|
if offset < 0:
|
|
break
|
|
hits.append(f"{label}@{offset}")
|
|
start = offset + 1
|
|
return " ".join(hits)
|
|
|
|
|
|
def write_event_rows(events: list[dict], needles: list[tuple[str, bytes]], out: Path) -> None:
|
|
out.parent.mkdir(parents=True, exist_ok=True)
|
|
header = [
|
|
"time",
|
|
"event",
|
|
"module",
|
|
"name",
|
|
"ecx",
|
|
"retval",
|
|
"args",
|
|
"candidate_index",
|
|
"candidate_size",
|
|
"candidate_ptr",
|
|
"value_hits",
|
|
"hex",
|
|
]
|
|
with out.open("w", encoding="utf-8", newline="") as handle:
|
|
writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n")
|
|
writer.writeheader()
|
|
for event in events:
|
|
candidates = event.get("candidates") or [None]
|
|
for index, candidate in enumerate(candidates):
|
|
data = b""
|
|
if candidate:
|
|
data = bytes_from_hex(candidate.get("hex", ""))
|
|
writer.writerow({
|
|
"time": event.get("time", ""),
|
|
"event": event.get("event", ""),
|
|
"module": event.get("module", ""),
|
|
"name": event.get("name", ""),
|
|
"ecx": event.get("ecx", ""),
|
|
"retval": event.get("retval", ""),
|
|
"args": json.dumps(event.get("args", []), separators=(",", ":")),
|
|
"candidate_index": "" if candidate is None else str(index),
|
|
"candidate_size": "" if candidate is None else str(candidate.get("size", "")),
|
|
"candidate_ptr": "" if candidate is None else candidate.get("ptr", ""),
|
|
"value_hits": value_hits(data, needles),
|
|
"hex": "" if candidate is None else candidate.get("hex", ""),
|
|
})
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("capture_dir", type=Path)
|
|
parser.add_argument("--out", type=Path)
|
|
parser.add_argument("--frida-log", type=Path)
|
|
args = parser.parse_args()
|
|
|
|
capture_dir = args.capture_dir
|
|
write_type, values = harness_write_values(capture_dir / "harness.log")
|
|
needles = value_needles(write_type, values)
|
|
frida_log = args.frida_log
|
|
if frida_log is None:
|
|
candidates = [
|
|
capture_dir / "frida.stdout.jsonl",
|
|
capture_dir / "client-frida.stdout.jsonl",
|
|
capture_dir / "service-frida.stdout.jsonl",
|
|
]
|
|
frida_log = next((path for path in candidates if path.exists()), candidates[0])
|
|
events = list(iter_json_events(frida_log))
|
|
out = args.out or (capture_dir / "frida-events.tsv")
|
|
write_event_rows(events, needles, out)
|
|
print(f"wrote {out}")
|
|
print(f"frida_log={frida_log}")
|
|
print(f"events={len(events)} write_type={write_type} write_values={','.join(values)} needles={len(needles)}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|