"""Decode ReadMessage capture (incoming WCF response bodies).""" import base64 import json import re import sys from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent CAPTURE = REPO_ROOT / "artifacts" / "reverse-engineering" / "instrumented-wcf-readmessage" / "readmessage-capture-event-latest.ndjson" # WCF response bodies don't carry the action URI in the body itself (the request action is # echoed differently). Look for known parameter names instead. WCF response wraps the result # in <{OpName}Response> with parameter elements inside. RESPONSE_NAME_RE = re.compile(rb"[A-Za-z][A-Za-z0-9]+Response") GUID_RE = re.compile(rb"[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}") PARAM_RE = re.compile(rb"[\x20-\x7E]{4,}") def main() -> int: records = [] with CAPTURE.open(encoding="utf-8-sig") as fh: for line in fh: records.append(json.loads(line)) print(f"# {CAPTURE.name}: {len(records)} records") print() print(f"{'#':>3} {'Length':>6} {'Sha8':<10} {'Operation':<32} {'GUIDs(first2)'}") print("-" * 120) for idx, rec in enumerate(records): body = base64.b64decode(rec["Base64"]) op_match = RESPONSE_NAME_RE.search(body) op = op_match.group(0).decode() if op_match else "" guids = [g.decode() for g in GUID_RE.findall(body)] guid_summary = ", ".join(guids[:2]) if guids else "" sha8 = rec["Sha256"][:8] print(f"{idx:>3} {rec['Length']:>6} {sha8:<10} {op:<32} {guid_summary}") # Detailed dump for the most interesting records. INTEREST = { "StartEventQuery": "after request to /Retr/StartEventQuery", "GetNextEventQueryResultBuffer": "the event-row body we want", "Open2": "session-establishing response", "EnsT2": "what the server returns for EnsureTags2", "EnsureTags2": "EnsT2 alt name", "RTag2": "RegisterTags2 response", "RegisterTags2": "RTag2 alt name", "UpdC3": "UpdateClientStatus3 response", "UpdateClientStatus3": "UpdC3 alt name", } print() print("# Detailed dumps") for idx, rec in enumerate(records): body = base64.b64decode(rec["Base64"]) op_match = RESPONSE_NAME_RE.search(body) if not op_match: continue op = op_match.group(0).decode().removesuffix("Response") if not any(k in op for k in INTEREST): continue print() print(f"=== Record {idx} {op} (length={rec['Length']}) ===") for off in range(0, min(256, len(body)), 32): chunk = body[off:off + 32] hp = " ".join(f"{b:02X}" for b in chunk) ap = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) print(f" {off:04X} {hp:<96} |{ap}|") return 0 if __name__ == "__main__": sys.exit(main())