"""Decode the GetNextQueryResultBuffer2 *response* for an analog summary (HCAL R1.8). Reads the both-hooks capture produced by scripts/Capture-SummaryRequest.ps1 -OnlyConfig analog-avg -WithResponse finds the ReadMessage record carrying GetNextQueryResultBuffer2Response, extracts the `pResultBuff` payload, hex-dumps it, and annotates every 8-byte window that decodes to a known ground-truth value (the AnalogSummaryHistory row for SysTimeSec) so the field offsets of CAnalogSummaryValue can be read off directly. Output is diagnostic; the only printed strings are the SDK-chosen system tag name and field markers. Sanitize before copying into docs/. """ import base64 import json import struct import sys from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent # Config name (analog-avg / analog-min / analog-max / …) selectable via argv[1]. CONFIG = sys.argv[1] if len(sys.argv) > 1 else "analog-avg" CAPTURE = (REPO_ROOT / "artifacts" / "reverse-engineering" / "instrumented-wcf-writemessage-summary" / f"summary-capture-{CONFIG}-latest.ndjson") RESP = b"GetNextQueryResultBuffer2Response" PARAM = b"pResultBuff" # Ground-truth values from AnalogSummaryHistory(SysTimeSec, 1h cycle) — used to label offsets. KNOWN_DOUBLES = { 31.0: "31.0 (First/Last/Average)", 100.0: "100.0 (PercentGood)", 0.031: "0.031 (Integral)", 111600.0: "111600.0 (Integral, full-cycle)", 1.0: "1.0 (ValueCount as double?)", } KNOWN_U32 = { 1: "ValueCount=1", 192: "OPCQuality=192", 100: "PercentGood=100", 9: "version=9", } def extract_param(body, param): i = body.find(param) if i < 0: return None i += len(param) marker = body[i] if marker == 0x9E: length = body[i + 1] return body[i + 2:i + 2 + length] if marker == 0x9F: length = int.from_bytes(body[i + 1:i + 3], "little") return body[i + 3:i + 3 + length] if marker == 0xA0: length = int.from_bytes(body[i + 1:i + 3], "little") return body[i + 3:i + 3 + length + 1] return None def main() -> int: if not CAPTURE.exists(): print(f"Capture not found: {CAPTURE}") print("Run: scripts/Capture-SummaryRequest.ps1 -OnlyConfig analog-avg -WithResponse") return 1 with CAPTURE.open(encoding="utf-8-sig") as fh: records = [json.loads(line) for line in fh if line.strip()] payload = None for rec in records: if rec.get("Phase") != "WCF.ReadMessage.Body": continue body = base64.b64decode(rec["Base64"]) if RESP not in body: continue payload = extract_param(body, PARAM) break if payload is None: print("No GetNextQueryResultBuffer2Response / pResultBuff found in capture.") return 2 print(f"pResultBuff: {len(payload)} bytes") if len(payload) >= 6: version = int.from_bytes(payload[0:2], "little") row_count = int.from_bytes(payload[2:6], "little") print(f" header: version={version} rowCount={row_count}") print() # Annotated hex dump. for off in range(0, len(payload), 16): chunk = payload[off:off + 16] hp = " ".join(f"{c:02X}" for c in chunk) ap = "".join(chr(c) if 32 <= c < 127 else "." for c in chunk) print(f" {off:04X} {hp:<48} |{ap}|") # Scan every 8-byte window for known doubles, and every 4-byte window for known u32s. print("\n== Known-value hits (offset -> field) ==") for off in range(0, len(payload) - 7): val = struct.unpack_from("14} -> {label}") for off in range(0, len(payload) - 3): val = int.from_bytes(payload[off:off + 4], "little") if val in KNOWN_U32: print(f" 0x{off:04X} uint32 {val:>14} -> {KNOWN_U32[val]}") # FILETIME windows (plausible 2026 timestamps: 0x01DC.. high dword). print("\n== Plausible FILETIME windows (Int64, year ~2020-2030) ==") for off in range(0, len(payload) - 7): ft = int.from_bytes(payload[off:off + 8], "little") # FILETIME for 2020-01-01 ~= 0x01D5BF.. ; 2030 ~= 0x01E5.. — gate by high word. if 0x01D5_0000_0000_0000 <= ft <= 0x01E6_0000_0000_0000: print(f" 0x{off:04X} filetime 0x{ft:016X}") return 0 if __name__ == "__main__": sys.exit(main())