Files
histsdk/scripts/decode-event-filter-capture.py
T
Joseph Doherty 6d470eab4a R1.7: server-side event filters — ReadEventsAsync(HistorianEventFilter), live-honored
Roadmap M1 R1.7. Filters are set on the native EventQuery object via
AddEventFilter(property, HistorianComparisionType, value) — NOT EventQueryArgs
(time/count/order only). Found via a new harness --dump-type-members command.

Captured the native filtered StartEventQuery pRequestBuff (Capture-EventFilter.ps1 +
harness --event-filter knob) and diffed Equal(0) vs Contains(12) to isolate the
operator field. Filter block (decoded byte-for-byte):
  ushort 0 + uint filterCount + uint condCount + uint nameLen + name(UTF-16) +
  uint 1 + ushort op + uint 1 + value(0x09-LEN-0x00 compact-ASCII) + byte 0

The filter is REAL, not inert (unlike the analog-summary knobs): a non-matching
predicate returns 0 events; Type=Equal=User.Write returns only User.Write events.
Verified live via both the native harness and the SDK.

- HistorianClient.ReadEventsAsync(start, end, HistorianEventFilter, ct) overload
- HistorianEventFilter + HistorianEventComparison (18 ops, ordinals = native)
- Filter encoding in HistorianEventQueryProtocol (empty-filter path unchanged)
- Golden-byte tests (block match, op field, empty-filter regression) + gated live test

Single string-valued predicate only; multi-filter (OR) / multi-condition (AND via
AddEventFilterCondition) framing is partially captured and not shipped. 216 unit
tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
2026-06-20 18:32:03 -04:00

114 lines
4.0 KiB
Python

"""Decode the StartEventQuery filter-block encoding (HCAL R1.7).
Extracts the `pRequestBuff` from the StartEventQuery WriteMessage body in the baseline
(no filter) and filtered captures produced by scripts/Capture-EventFilter.ps1, dumps both,
and marks where they diverge so the filter predicate (property name / comparison op / value)
can be read off the empty-filter baseline.
Output is diagnostic. Sanitize before copying into docs/.
"""
import base64
import json
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
CAPDIR = REPO_ROOT / "artifacts" / "reverse-engineering" / "instrumented-wcf-event-filter"
PARAM = b"pRequestBuff"
OP = b"StartEventQuery"
def extract_request(path):
if not path.exists():
return None
for line in path.open(encoding="utf-8-sig"):
if not line.strip():
continue
rec = json.loads(line)
if rec.get("Phase") != "WCF.WriteMessage.Body":
continue
body = base64.b64decode(rec["Base64"])
if OP not in body:
continue
i = body.find(PARAM)
if i < 0:
continue
i += len(PARAM)
for s in range(i, min(i + 16, len(body))):
m = body[s]
if m == 0x9E:
return body[s + 2:s + 2 + body[s + 1]]
if m == 0x9F:
n = int.from_bytes(body[s + 1:s + 3], "little")
return body[s + 3:s + 3 + n]
if m == 0xA0:
n = int.from_bytes(body[s + 1:s + 3], "little")
return body[s + 3:s + 3 + n]
return None
def hexdump(label, buf):
print(f"=== {label}: {len(buf)} bytes ===")
for off in range(0, len(buf), 16):
c = buf[off:off + 16]
hp = " ".join(f"{x:02X}" for x in c)
ap = "".join(chr(x) if 32 <= x < 127 else "." for x in c)
print(f" {off:04X} {hp:<48} |{ap}|")
print()
def main() -> int:
base = extract_request(CAPDIR / "event-filter-capture-baseline-latest.ndjson")
filt = extract_request(CAPDIR / "event-filter-capture-filtered-latest.ndjson")
if base is None or filt is None:
print("Missing capture(s). Run scripts/Capture-EventFilter.ps1 first.")
print(f" baseline: {'ok' if base is not None else 'MISSING'}")
print(f" filtered: {'ok' if filt is not None else 'MISSING'}")
return 1
hexdump("baseline (no filter) pRequestBuff", base)
hexdump("filtered pRequestBuff", filt)
# First divergence offset.
n = min(len(base), len(filt))
div = next((i for i in range(n) if base[i] != filt[i]), n)
print(f"== First divergence at offset 0x{div:04X} (lenBase={len(base)} lenFilt={len(filt)}) ==")
print(" Filtered bytes from divergence (the inserted filter block):")
tail = filt[div:]
for off in range(0, len(tail), 16):
c = tail[off:off + 16]
hp = " ".join(f"{x:02X}" for x in c)
ap = "".join(chr(x) if 32 <= x < 127 else "." for x in c)
print(f" {div + off:04X} {hp:<48} |{ap}|")
print("\n== Strings in filtered buffer ==")
for enc, label in ((b"ascii", "ASCII"), (None, "UTF-16LE")):
if enc == b"ascii":
cur, start = [], 0
for i, x in enumerate(filt):
if 32 <= x < 127:
if not cur:
start = i
cur.append(chr(x))
else:
if len(cur) >= 3:
print(f" {label} 0x{start:04X} {''.join(cur)!r}")
cur = []
else:
i = 0
while i < len(filt) - 1:
j, chars = i, []
while j < len(filt) - 1 and 32 <= filt[j] < 127 and filt[j + 1] == 0:
chars.append(chr(filt[j]))
j += 2
if len(chars) >= 3:
print(f" {label} 0x{i:04X} {''.join(chars)!r}")
i = j
else:
i += 1
return 0
if __name__ == "__main__":
sys.exit(main())