Files
histsdk/scripts/decode-add-tep-capture.py
T
Joseph Doherty 08b950caee R1.11 AddTagExtendedPropertiesAsync: extended-property write via AddTEx
Adds user-defined extended properties to an existing tag via the 2020 WCF
AddTEx (AddTagExtendedProperties) op. Write-enabled connection + uppercase
storage-session GUID handle; reuses the write orchestrator open/priming chain.

The AddTEx inBuff is the exact inverse of the R1.5 GetTepByNm read-response
framing, so the serializer mirrors the read parser:
  uint32 groupCount + 0x01(group) + [0x09+u16+ASCII tag] + uint32 propCount
  + per prop{ 0x02 + [0x09+u16+ASCII name] + 0x43 VT_BSTR + u16 payloadLen
  + u16 charCount + UTF-16 value } + 0x01(group trailer) + 0x00(terminator).
The trailing 0x00 is required — without it inBuff is one byte short and the
server throws SErrorException in CHistStorage::AddTagExtendedProperties. The
golden fixture pins the clean inBuff the live server accepted (dumped via
AVEVA_HISTORIAN_TEP_DUMP); read-back verified via R1.5. String (0x43) values only.

Delete (DelTep) is deferred: the native DeleteTagExtendedPropertiesByName does a
client-side sync check and returns err 229 for a just-added property, so the
DelTep request never reaches the wire and its inBuff can't be captured yet.

Shipped: HistorianClient.AddTagExtendedPropertiesAsync/AddTagExtendedPropertyAsync;
HistorianTagExtendedPropertyProtocol.SerializeAddRequest; orchestrator path;
golden WcfTagExtendedPropertyWriteProtocolTests (4); gated live write/read-back test;
native-harness `add-tep` scenario + Capture-AddTagExtendedProperties.ps1 +
decode-add-tep-capture.py. Doc: wcf-add-tag-extended-properties.md. 233 tests green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
2026-06-21 01:43:19 -04:00

116 lines
3.7 KiB
Python

"""Decode the AddTagExtendedProperties / DeleteTagExtendedProperties WCF inBuff (HCAL R1.11).
Reads the capture produced by scripts/Capture-AddTagExtendedProperties.ps1 and locates the AddTEx /
DelTep WriteMessage bodies by the sandbox tag + property name/value, then dumps the inBuff bytes so
the framing (tag name, property count, per-property name + value markers) can be read off. Compare to
the R1.5 read-response encoding in HistorianTagExtendedPropertyProtocol.
Output is diagnostic. Sanitize before copying into docs/.
"""
import base64
import json
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
CAPDIR = REPO_ROOT / "artifacts" / "reverse-engineering" / "instrumented-wcf-add-tep"
CAP = CAPDIR / "add-tep-capture-latest.ndjson"
TAG = "RetestSdkWriteTepTag"
PROP = "SdkTestProp"
VALUE = "SdkTestValue"
OP_ADD = b"AddTEx"
OP_DEL = b"DelTep"
def hexdump(label, buf, base=0):
print(f"=== {label}: {len(buf)} bytes ===")
for off in range(0, len(buf), 16):
c = buf[off:off + 16]
hp = " ".join(f"{x:02X}" for x in c)
ap = "".join(chr(x) if 32 <= x < 127 else "." for x in c)
print(f" {base + off:04X} {hp:<48} |{ap}|")
print()
def ascii_strings(buf, minlen=3):
out, cur, start = [], [], 0
for i, x in enumerate(buf):
if 32 <= x < 127:
if not cur:
start = i
cur.append(chr(x))
else:
if len(cur) >= minlen:
out.append((start, "".join(cur)))
cur = []
if len(cur) >= minlen:
out.append((start, "".join(cur)))
return out
def u16_strings(buf, minlen=3):
out, i = [], 0
while i < len(buf) - 1:
j, chars = i, []
while j < len(buf) - 1 and 32 <= buf[j] < 127 and buf[j + 1] == 0:
chars.append(chr(buf[j]))
j += 2
if len(chars) >= minlen:
out.append((i, "".join(chars)))
i = j
else:
i += 1
return out
def main() -> int:
if not CAP.exists():
print(f"Missing capture: {CAP}\nRun scripts/Capture-AddTagExtendedProperties.ps1 first.")
return 1
records = []
for line in CAP.open(encoding="utf-8-sig"):
if line.strip():
records.append(json.loads(line))
tag_a, prop_a, val_a = TAG.encode("ascii"), PROP.encode("ascii"), VALUE.encode("ascii")
tag_u, prop_u, val_u = TAG.encode("utf-16-le"), PROP.encode("utf-16-le"), VALUE.encode("utf-16-le")
print(f"== {len(records)} MDAS bodies captured ==")
for idx, rec in enumerate(records):
body = base64.b64decode(rec["Base64"])
flags = []
if OP_ADD in body:
flags.append("AddTEx")
if OP_DEL in body:
flags.append("DelTep")
if prop_a in body or prop_u in body:
flags.append("PROP")
if val_a in body or val_u in body:
flags.append("VALUE")
print(f" [{idx:02d}] {rec.get('Phase'):26s} len={len(body):5d} {','.join(flags)}")
def dump(op):
for idx, rec in enumerate(records):
body = base64.b64decode(rec["Base64"])
if rec.get("Phase") == "WCF.WriteMessage.Body" and op in body:
hexdump(f"[{idx}] {op.decode()} WriteMessage", body)
print(" UTF-16 strings:")
for off, s in u16_strings(body):
print(f" 0x{off:04X} {s!r}")
print(" ASCII strings:")
for off, s in ascii_strings(body):
print(f" 0x{off:04X} {s!r}")
print()
print("\n== AddTEx request(s) ==")
dump(OP_ADD)
print("\n== DelTep request(s) ==")
dump(OP_DEL)
return 0
if __name__ == "__main__":
sys.exit(main())