"""Decoder for the analog/state summary request capture (HCAL roadmap R1.8/R1.9). Reads the per-config NDJSON captures produced by scripts/Capture-SummaryRequest.ps1 under artifacts/reverse-engineering/instrumented-wcf-writemessage-summary/, extracts the Retr/StartQuery2 `pRequestBuff` payload from each, hex-dumps it, and diffs every summary candidate against the baseline-full request so the differing bytes (the native QueryType / SummaryType / AutoSummaryParameters fields) stand out. Output is diagnostic. The only printed strings are the SDK-chosen system tag name and protocol field markers — sanitize before copying any of it into docs/. """ import base64 import json import sys from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent CAPTURE_DIR = REPO_ROOT / "artifacts" / "reverse-engineering" / "instrumented-wcf-writemessage-summary" ACTION = b"aa/Retr/StartQuery2" PARAM = b"pRequestBuff" def extract_request_buffer(records): """Return the pRequestBuff bytes from the first StartQuery2 write record, or None.""" for rec in records: if rec.get("Phase") != "WCF.WriteMessage.Body": continue body = base64.b64decode(rec["Base64"]) if ACTION not in body: continue i = body.find(PARAM) if i < 0: continue i += len(PARAM) marker = body[i] # MDAS length markers (same scheme as the write decoder). if marker == 0x9E: length = body[i + 1] return body[i + 2:i + 2 + length] if marker == 0x9F: length = int.from_bytes(body[i + 1:i + 3], "little") return body[i + 3:i + 3 + length] if marker == 0xA0: length = int.from_bytes(body[i + 1:i + 3], "little") return body[i + 3:i + 3 + length + 1] return None return None def hexdump(payload, diff_against=None): for off in range(0, len(payload), 16): chunk = payload[off:off + 16] cells = [] for j, c in enumerate(chunk): mark = "" if diff_against is not None: k = off + j if k >= len(diff_against) or diff_against[k] != c: mark = "*" cells.append(f"{c:02X}{mark}") hp = " ".join(cells) ap = "".join(chr(c) if 32 <= c < 127 else "." for c in chunk) print(f" {off:04X} {hp:<56} |{ap}|") def load(path): with path.open(encoding="utf-8-sig") as fh: return [json.loads(line) for line in fh if line.strip()] def main() -> int: if not CAPTURE_DIR.exists(): print(f"Capture dir not found: {CAPTURE_DIR}") print("Run scripts/Capture-SummaryRequest.ps1 first.") return 1 captures = sorted(CAPTURE_DIR.glob("summary-capture-*-latest.ndjson")) if not captures: print(f"No capture files in {CAPTURE_DIR}") return 1 buffers = {} for path in captures: name = path.stem.replace("summary-capture-", "").replace("-latest", "") records = load(path) buf = extract_request_buffer(records) buffers[name] = buf status = f"{len(buf)} bytes" if buf else "" print(f"{name:<18} records={len(records):>3} pRequestBuff={status}") baseline = buffers.get("baseline-full") print() if not baseline: print("No baseline-full request buffer captured; cannot diff. Dumping each raw.") for name, buf in buffers.items(): if buf: print(f"\n== {name} pRequestBuff ({len(buf)} bytes) ==") hexdump(buf) return 0 print(f"== baseline-full pRequestBuff ({len(baseline)} bytes) ==") hexdump(baseline) for name, buf in buffers.items(): if name == "baseline-full" or not buf: continue print(f"\n== {name} pRequestBuff ({len(buf)} bytes) — '*' marks bytes differing from baseline ==") hexdump(buf, diff_against=baseline) return 0 if __name__ == "__main__": sys.exit(main())