from __future__ import annotations import argparse import csv import datetime as dt import json import re import struct from pathlib import Path JSON_RE = re.compile(r"(\{.*\})") HARNESS_RE = re.compile(r"^(?P\S+)\t(?P[^\t]+)\t(?P.*)$") def iter_json_events(path: Path): for line in path.read_text(encoding="utf-8", errors="ignore").splitlines(): match = JSON_RE.search(line) if not match: continue try: yield json.loads(match.group(1)) except json.JSONDecodeError: continue def harness_write_values(path: Path) -> tuple[str, list[str]]: write_type = "" values: list[str] = [] for line in path.read_text(encoding="utf-8", errors="ignore").splitlines(): match = HARNESS_RE.match(line) if not match: continue payload = json.loads(match.group("payload")) if match.group("event") == "harness.start": write_type = str(payload.get("WriteType", "")) continue if match.group("event") != "mx.write.begin": continue value = payload.get("Value", {}).get("Value") if value is not None: values.append(str(value)) return write_type.lower(), values def bytes_from_hex(text: str) -> bytes: if not text: return b"" return bytes.fromhex(text) def value_needles(write_type: str, values: list[str]) -> list[tuple[str, bytes]]: needles: list[tuple[str, bytes]] = [] for value in values: if write_type in {"int", "integer", "int32"}: needles.append((value, struct.pack(" dt.datetime: try: return dt.datetime.fromisoformat(value) except ValueError: pass for fmt in ("%m/%d/%Y %H:%M:%S", "%m/%d/%Y %I:%M:%S %p"): try: return dt.datetime.strptime(value, fmt) except ValueError: continue raise ValueError(f"Unsupported datetime value {value!r}") def value_hits(data: bytes, needles: list[tuple[str, bytes]]) -> str: hits: list[str] = [] for label, needle in needles: start = 0 while True: offset = data.find(needle, start) if offset < 0: break hits.append(f"{label}@{offset}") start = offset + 1 return " ".join(hits) def write_event_rows(events: list[dict], needles: list[tuple[str, bytes]], out: Path) -> None: out.parent.mkdir(parents=True, exist_ok=True) header = [ "time", "event", "module", "name", "ecx", "retval", "args", "candidate_index", "candidate_size", "candidate_ptr", "value_hits", "hex", ] with out.open("w", encoding="utf-8", newline="") as handle: writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n") writer.writeheader() for event in events: candidates = event.get("candidates") or [None] for index, candidate in enumerate(candidates): data = b"" if candidate: data = bytes_from_hex(candidate.get("hex", "")) writer.writerow({ "time": event.get("time", ""), "event": event.get("event", ""), "module": event.get("module", ""), "name": event.get("name", ""), "ecx": event.get("ecx", ""), "retval": event.get("retval", ""), "args": json.dumps(event.get("args", []), separators=(",", ":")), "candidate_index": "" if candidate is None else str(index), "candidate_size": "" if candidate is None else str(candidate.get("size", "")), "candidate_ptr": "" if candidate is None else candidate.get("ptr", ""), "value_hits": value_hits(data, needles), "hex": "" if candidate is None else candidate.get("hex", ""), }) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("capture_dir", type=Path) parser.add_argument("--out", type=Path) parser.add_argument("--frida-log", type=Path) args = parser.parse_args() capture_dir = args.capture_dir write_type, values = harness_write_values(capture_dir / "harness.log") needles = value_needles(write_type, values) frida_log = args.frida_log if frida_log is None: candidates = [ capture_dir / "frida.stdout.jsonl", capture_dir / "client-frida.stdout.jsonl", capture_dir / "service-frida.stdout.jsonl", ] frida_log = next((path for path in candidates if path.exists()), candidates[0]) events = list(iter_json_events(frida_log)) out = args.out or (capture_dir / "frida-events.tsv") write_event_rows(events, needles, out) print(f"wrote {out}") print(f"frida_log={frida_log}") print(f"events={len(events)} write_type={write_type} write_values={','.join(values)} needles={len(needles)}") return 0 if __name__ == "__main__": raise SystemExit(main())