"""Decode the AddS2 (AddStreamValues2) pBuf event-VTQ blob captured for event-send (R2.2). Extracts the `pBuf` parameter from the AddS2 WriteMessage body in the event-send capture and hex-dumps it, annotating windows that match the known test event so the HistorianEvent.PackToVtq framing can be read off and inverted into a managed serializer. Known test event (from scripts/Capture-EventSend.ps1 defaults): Type="User.Write", Namespace="RetestSdkEventSend", properties: Source="RetestSdkEventSend", TestMarker="histsdk-R2.1-capture" Output is diagnostic. Sanitize before copying into docs/. """ import base64 import json import struct import sys from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent CAPTURE = (REPO_ROOT / "artifacts" / "reverse-engineering" / "instrumented-wcf-event-send" / "event-send-capture-latest.ndjson") PARAM = b"pBuf" ADDS2 = b"AddS2" def extract_param(body, param): i = body.find(param) if i < 0: return None i += len(param) # Skip the closing of the element name / attributes until a binary length marker. # MDAS length markers: 0x9E (1-byte len), 0x9F (2-byte len), 0xA0 (2-byte len+1). for scan in range(i, min(i + 16, len(body))): marker = body[scan] if marker == 0x9E: length = body[scan + 1] return body[scan + 2:scan + 2 + length] if marker == 0x9F: length = int.from_bytes(body[scan + 1:scan + 3], "little") return body[scan + 3:scan + 3 + length] if marker == 0xA0: length = int.from_bytes(body[scan + 1:scan + 3], "little") return body[scan + 3:scan + 3 + length + 1] return None def main() -> int: if not CAPTURE.exists(): print(f"Capture not found: {CAPTURE}") return 1 with CAPTURE.open(encoding="utf-8-sig") as fh: records = [json.loads(line) for line in fh if line.strip()] body = None for r in records: if r.get("Phase") != "WCF.WriteMessage.Body": continue b = base64.b64decode(r["Base64"]) if ADDS2 in b: body = b break if body is None: print("No AddS2 WriteMessage body found.") return 2 pbuf = extract_param(body, PARAM) if pbuf is None: print("Found AddS2 body but could not extract pBuf. Full body hex dump:") pbuf = body print(f"pBuf: {len(pbuf)} bytes\n") for off in range(0, len(pbuf), 16): chunk = pbuf[off:off + 16] hp = " ".join(f"{c:02X}" for c in chunk) ap = "".join(chr(c) if 32 <= c < 127 else "." for c in chunk) print(f" {off:04X} {hp:<48} |{ap}|") print("\n== ASCII strings (len>=3) ==") cur = [] start = 0 for i, c in enumerate(pbuf): if 32 <= c < 127: if not cur: start = i cur.append(chr(c)) else: if len(cur) >= 3: print(f" 0x{start:04X} {''.join(cur)!r}") cur = [] if len(cur) >= 3: print(f" 0x{start:04X} {''.join(cur)!r}") print("\n== UTF-16LE strings (len>=3) ==") i = 0 while i < len(pbuf) - 1: j = i chars = [] while j < len(pbuf) - 1 and 32 <= pbuf[j] < 127 and pbuf[j + 1] == 0: chars.append(chr(pbuf[j])) j += 2 if len(chars) >= 3: print(f" 0x{i:04X} {''.join(chars)!r}") i = j else: i += 1 return 0 if __name__ == "__main__": sys.exit(main())