"""Decode the GetHistorianInfo (GETHI) WCF request/response (HCAL R1.4). Reads the chained WriteMessage+ReadMessage capture produced by scripts/Capture-HistorianInfo.ps1 and locates the GetHistorianInfo exchange. The goal is to learn (a) the pRequestBuff that returns the FULL HISTORIAN_INFO struct (distinct from the named-value "HistorianVersion" request) and (b) the response struct layout: the analysis folder says it's 518 bytes with the version string (UTF-16, null-terminated) at offset 0 and EventStorageMode (int32) at offset 514. We flag candidate bodies by the GETHI op action, by the server version value, and by a response length near 518, then dump bytes + the int32 at offset 514 so the layout can be read off directly. Output is diagnostic. Sanitize before copying into docs/. """ import base64 import json import struct import sys from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent CAPDIR = REPO_ROOT / "artifacts" / "reverse-engineering" / "instrumented-wcf-historian-info" CAP = CAPDIR / "historian-info-capture-latest.ndjson" # The GETHI op action (WS-Addressing) the native client sends. The server version value is # version-shaped, not secret; used only to locate the response. OP_ASCII = b"GetHistorianInfo" OP_GETHI = b"GETHI" VERSION = "20,0,000,000" VERSION_U16 = VERSION.encode("utf-16-le") VERSION_ASCII = VERSION.encode("ascii") def hexdump(label, buf, base=0): print(f"=== {label}: {len(buf)} bytes ===") for off in range(0, len(buf), 16): c = buf[off:off + 16] hp = " ".join(f"{x:02X}" for x in c) ap = "".join(chr(x) if 32 <= x < 127 else "." for x in c) print(f" {base + off:04X} {hp:<48} |{ap}|") print() def ascii_strings(buf, minlen=3): out, cur, start = [], [], 0 for i, x in enumerate(buf): if 32 <= x < 127: if not cur: start = i cur.append(chr(x)) else: if len(cur) >= minlen: out.append((start, "".join(cur))) cur = [] if len(cur) >= minlen: out.append((start, "".join(cur))) return out def u16_strings(buf, minlen=3): out, i = [], 0 while i < len(buf) - 1: j, chars = i, [] while j < len(buf) - 1 and 32 <= buf[j] < 127 and buf[j + 1] == 0: chars.append(chr(buf[j])) j += 2 if len(chars) >= minlen: out.append((i, "".join(chars))) i = j else: i += 1 return out def main() -> int: if not CAP.exists(): print(f"Missing capture: {CAP}\nRun scripts/Capture-HistorianInfo.ps1 first.") return 1 records = [] for line in CAP.open(encoding="utf-8-sig"): if line.strip(): records.append(json.loads(line)) print(f"== {len(records)} MDAS bodies captured ==") for idx, rec in enumerate(records): body = base64.b64decode(rec["Base64"]) flags = [] if OP_ASCII in body or OP_GETHI in body: flags.append("GETHI-OP") if VERSION_U16 in body or VERSION_ASCII in body: flags.append("VERSION") # A ~518-byte embedded struct is the tell for the full-info response. if 500 <= len(body) <= 4096: flags.append(f"len={len(body)}") print(f" [{idx:02d}] {rec.get('Phase'):26s} len={len(body):5d} {','.join(flags)}") def find(predicate): hits = [] for idx, rec in enumerate(records): body = base64.b64decode(rec["Base64"]) if predicate(rec, body): hits.append((idx, rec, body)) return hits print("\n== Request candidate(s): WriteMessage bodies tagged GETHI-OP ==") for idx, rec, body in find(lambda r, b: r.get("Phase") == "WCF.WriteMessage.Body" and (OP_ASCII in b or OP_GETHI in b)): hexdump(f"[{idx}] WriteMessage", body) print(" UTF-16 strings:") for off, s in u16_strings(body): print(f" 0x{off:04X} {s!r}") print(" ASCII strings:") for off, s in ascii_strings(body): print(f" 0x{off:04X} {s!r}") print() print("\n== Response candidate(s): ReadMessage bodies carrying VERSION ==") for idx, rec, body in find(lambda r, b: r.get("Phase") == "WCF.ReadMessage.Body" and (VERSION_U16 in b or VERSION_ASCII in b)): hexdump(f"[{idx}] ReadMessage", body) print(" UTF-16 strings:") for off, s in u16_strings(body): print(f" 0x{off:04X} {s!r}") # The analysis folder pins EventStorageMode @ offset 514 (int32) inside the # 518-byte struct. The struct is embedded in the MDAS body at some base; scan for # a plausible version@0 run and print the int32 514 bytes after each candidate base. print(" Candidate struct decodes (version@base, int32 @ base+514):") for base_off, s in u16_strings(body): if any(ch.isdigit() for ch in s) and "," in s: idx514 = base_off + 514 if idx514 + 4 <= len(body): mode = struct.unpack_from("