from __future__ import annotations import argparse import json import re import struct from pathlib import Path ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]") def load_frida_events(path: Path) -> list[dict]: events: list[dict] = [] for raw in path.read_text(errors="replace").splitlines(): line = ANSI_RE.sub("", raw).strip() index = line.find("{") if index < 0: continue try: outer = json.loads(line[index:]) except json.JSONDecodeError: continue payload = outer.get("payload") if outer.get("type") == "send" else outer if isinstance(payload, str): try: payload = json.loads(payload) except json.JSONDecodeError: continue if isinstance(payload, dict) and payload.get("event"): events.append(payload) return events def first_candidate(event: dict, min_size: int = 20, max_size: int = 4096) -> bytes | None: for candidate in event.get("candidates") or []: hex_text = candidate.get("hex") or "" size = int(candidate.get("size") or 0) if hex_text and min_size <= size <= max_size: return bytes.fromhex(hex_text) return None def extract_needles(events: list[dict], scalar_int: int | None) -> list[tuple[str, bytes]]: start = None for index, event in enumerate(events): if event.get("name", "").startswith("CLMXProxyServer.Write") and event.get("args"): start = index break needles: list[tuple[str, bytes]] = [] if scalar_int is not None: needles.append((f"int32:{scalar_int}", struct.pack(" int: parser = argparse.ArgumentParser() parser.add_argument("capture_dir", type=Path) parser.add_argument("--scalar-int", type=int) parser.add_argument("--frida-log", type=Path) args = parser.parse_args() frida_log = args.frida_log if frida_log is None: candidates = [ args.capture_dir / "frida.stdout.jsonl", args.capture_dir / "client-frida.stdout.jsonl", ] frida_log = next((path for path in candidates if path.exists()), candidates[0]) events = load_frida_events(frida_log) needles = extract_needles(events, args.scalar_int) stream_paths = sorted(args.capture_dir.glob("tcp-stream-*.bin")) lines = ["needle\tneedle_len\tstream\toffset"] for name, needle in needles: hit_count = 0 for stream_path in stream_paths: data = stream_path.read_bytes() offset = data.find(needle) if offset >= 0: lines.append(f"{name}\t{len(needle)}\t{stream_path.name}\t{offset}") hit_count += 1 if hit_count == 0: lines.append(f"{name}\t{len(needle)}\t\t-1") output = args.capture_dir / "frida-to-tcp-map.tsv" output.write_text("\n".join(lines) + "\n", encoding="utf-8") print(f"wrote {output}") return 0 if __name__ == "__main__": raise SystemExit(main())