"""Inventory the writemessage event-flow capture: action URI + length + first bytes.""" import base64 import json import re import sys from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent CAPTURE = REPO_ROOT / "artifacts" / "reverse-engineering" / "instrumented-wcf-writemessage" / "writemessage-capture-event-latest.ndjson" # Match aa// where service is Hist|Retr|Trx|Stor and op is alphanumeric chars. ACTION_RE = re.compile(rb"aa/(?:Hist|Retr|Trx|Stor)/[A-Za-z0-9]+") GUID_RE = re.compile(rb"[0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12}") PARAM_NAME_RE = re.compile(rb"[\x20-\x7E]{4,}") def first_action(body: bytes) -> str: m = ACTION_RE.search(body) return m.group(0).decode("ascii") if m else "" def all_guids(body: bytes) -> list[str]: return [g.decode() for g in GUID_RE.findall(body)] def all_param_names(body: bytes) -> list[str]: # Look for short ASCII runs that aren't the action URI / endpoint URL. names = [] for m in PARAM_NAME_RE.finditer(body): s = m.group(0).decode("ascii") if any(skip in s for skip in ("aa/", "net.pipe://", "AVEVA", "DESKTOP-", "WORKGROUP", "NTLMSSP")): continue if 3 <= len(s) <= 40 and s.isprintable(): names.append(s) return names def main() -> int: print(f"# Inventory of {CAPTURE.name}") records = [] with CAPTURE.open(encoding="utf-8-sig") as fh: for idx, line in enumerate(fh): rec = json.loads(line) body = base64.b64decode(rec["Base64"]) records.append((idx, rec, body)) # Pass 1: action URIs and lengths. print() print(f"{'#':>3} {'Length':>6} {'Action':<40} {'GUIDs (first 2)'}") print("-" * 110) for idx, rec, body in records: action = first_action(body) guids = all_guids(body) guid_summary = ", ".join(guids[:2]) if guids else "" print(f"{idx:>3} {rec['Length']:>6} {action:<40} {guid_summary}") # Pass 2: detailed dump for the unknown records. UNKNOWN = {6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 18, 20} print() print("# Detailed dump of unknown records (action + param names + first 96 bytes hex)") for idx, rec, body in records: if idx not in UNKNOWN: continue action = first_action(body) params = all_param_names(body) print() print(f"=== Record {idx} (length={rec['Length']}, action={action}) ===") print(f" Param-ish strings: {params}") print(f" GUIDs found : {all_guids(body)}") # First 128 bytes of hex split into rows of 32. for off in range(0, min(160, len(body)), 32): chunk = body[off:off + 32] hex_part = " ".join(f"{b:02X}" for b in chunk) ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) print(f" {off:04X} {hex_part:<96} |{ascii_part}|") return 0 if __name__ == "__main__": sys.exit(main())