Files
histsdk/scripts/decode-event-capture.py
dohertj2 c95824a65d Initial commit: managed .NET 10 AVEVA Historian SDK + reverse-engineering toolkit
Full read-only SDK (src/AVEVA.Historian.Client) implementing the CLAUDE.md required
surface against AVEVA Historian's binary WCF protocol — no native AVEVA runtime
dependency. All operations live-verified against a local Historian:

- ProbeAsync, ReadRawAsync, ReadAggregateAsync, ReadAtTimeAsync, ReadEventsAsync
- BrowseTagNamesAsync, GetTagMetadataAsync (17 native data-type codes mapped)
- GetConnectionStatusAsync, GetStoreForwardStatusAsync, GetSystemParameterAsync
- 108/108 unit + integration tests pass

Includes the reverse-engineering toolkit (tools/AVEVA.Historian.ReverseEngineering)
used to decode the protocol: WCF probes, IL inspection via dnlib, and IL-rewrite
instrumentation (instrument-wcf-{write,read}message etc.) plus the .NET Framework
trace harness (tools/AVEVA.Historian.NativeTraceHarness) for parity testing.

Sanitized handoff evidence under docs/reverse-engineering/. Native AVEVA binaries
(current/, aveva-install-x64/, aveva-install-x86/) are gitignored — fetch separately
from the AVEVA installer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 06:31:48 -04:00

82 lines
2.9 KiB
Python

"""Inventory the writemessage event-flow capture: action URI + length + first bytes."""
import base64
import json
import re
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
CAPTURE = REPO_ROOT / "artifacts" / "reverse-engineering" / "instrumented-wcf-writemessage" / "writemessage-capture-event-latest.ndjson"
# Match aa/<service>/<op> where service is Hist|Retr|Trx|Stor and op is alphanumeric chars.
ACTION_RE = re.compile(rb"aa/(?:Hist|Retr|Trx|Stor)/[A-Za-z0-9]+")
GUID_RE = re.compile(rb"[0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12}")
PARAM_NAME_RE = re.compile(rb"[\x20-\x7E]{4,}")
def first_action(body: bytes) -> str:
m = ACTION_RE.search(body)
return m.group(0).decode("ascii") if m else "<no-action-found>"
def all_guids(body: bytes) -> list[str]:
return [g.decode() for g in GUID_RE.findall(body)]
def all_param_names(body: bytes) -> list[str]:
# Look for short ASCII runs that aren't the action URI / endpoint URL.
names = []
for m in PARAM_NAME_RE.finditer(body):
s = m.group(0).decode("ascii")
if any(skip in s for skip in ("aa/", "net.pipe://", "AVEVA", "DESKTOP-", "WORKGROUP", "NTLMSSP")):
continue
if 3 <= len(s) <= 40 and s.isprintable():
names.append(s)
return names
def main() -> int:
print(f"# Inventory of {CAPTURE.name}")
records = []
with CAPTURE.open(encoding="utf-8-sig") as fh:
for idx, line in enumerate(fh):
rec = json.loads(line)
body = base64.b64decode(rec["Base64"])
records.append((idx, rec, body))
# Pass 1: action URIs and lengths.
print()
print(f"{'#':>3} {'Length':>6} {'Action':<40} {'GUIDs (first 2)'}")
print("-" * 110)
for idx, rec, body in records:
action = first_action(body)
guids = all_guids(body)
guid_summary = ", ".join(guids[:2]) if guids else ""
print(f"{idx:>3} {rec['Length']:>6} {action:<40} {guid_summary}")
# Pass 2: detailed dump for the unknown records.
UNKNOWN = {6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 18, 20}
print()
print("# Detailed dump of unknown records (action + param names + first 96 bytes hex)")
for idx, rec, body in records:
if idx not in UNKNOWN:
continue
action = first_action(body)
params = all_param_names(body)
print()
print(f"=== Record {idx} (length={rec['Length']}, action={action}) ===")
print(f" Param-ish strings: {params}")
print(f" GUIDs found : {all_guids(body)}")
# First 128 bytes of hex split into rows of 32.
for off in range(0, min(160, len(body)), 32):
chunk = body[off:off + 32]
hex_part = " ".join(f"{b:02X}" for b in chunk)
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
print(f" {off:04X} {hex_part:<96} |{ascii_part}|")
return 0
if __name__ == "__main__":
sys.exit(main())