R1.7: server-side event filters — ReadEventsAsync(HistorianEventFilter), live-honored
Roadmap M1 R1.7. Filters are set on the native EventQuery object via AddEventFilter(property, HistorianComparisionType, value) — NOT EventQueryArgs (time/count/order only). Found via a new harness --dump-type-members command. Captured the native filtered StartEventQuery pRequestBuff (Capture-EventFilter.ps1 + harness --event-filter knob) and diffed Equal(0) vs Contains(12) to isolate the operator field. Filter block (decoded byte-for-byte): ushort 0 + uint filterCount + uint condCount + uint nameLen + name(UTF-16) + uint 1 + ushort op + uint 1 + value(0x09-LEN-0x00 compact-ASCII) + byte 0 The filter is REAL, not inert (unlike the analog-summary knobs): a non-matching predicate returns 0 events; Type=Equal=User.Write returns only User.Write events. Verified live via both the native harness and the SDK. - HistorianClient.ReadEventsAsync(start, end, HistorianEventFilter, ct) overload - HistorianEventFilter + HistorianEventComparison (18 ops, ordinals = native) - Filter encoding in HistorianEventQueryProtocol (empty-filter path unchanged) - Golden-byte tests (block match, op field, empty-filter regression) + gated live test Single string-valued predicate only; multi-filter (OR) / multi-condition (AND via AddEventFilterCondition) framing is partially captured and not shipped. 216 unit tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
@@ -36,10 +36,11 @@ HCAL replacement, built on the **2023 R2 gRPC transport**. Derived from
|
||||
> the boundary is the **handle type** (see the string-handle wall note under §1b and
|
||||
> `docs/reverse-engineering/wcf-string-handle-wall.md`): **`uint`-handle ops work, `string`-handle
|
||||
> ops are blocked.** GETHI/GetTepByNm were probed and confirmed blocked (not, as first guessed,
|
||||
> reachable). The genuinely reachable next items on 2020 WCF are the remaining **`uint`-handle**
|
||||
> ops: **R1.8/R1.9 StartQuery summary/state modes** and **R1.7 event filters** (filter bytes ride
|
||||
> the proven `uint`-handle `StartEventQuery`). Everything string-handle waits on one RE target:
|
||||
> the native session/filter registration.
|
||||
> reachable). The reachable **`uint`-handle** items are now **DONE**: ~~R1.8/R1.9 StartQuery
|
||||
> summary/state modes~~ (resolved = existing `ReadAggregateAsync`) and ~~R1.7 event filters~~
|
||||
> (✅ 2026-06-20 — `ReadEventsAsync(…, HistorianEventFilter)`, live-honored). M2 event send is
|
||||
> also done (✅ WCF `AddS2`). Everything string-handle still waits on one RE target: the native
|
||||
> session/filter registration.
|
||||
|
||||
## Guiding principles
|
||||
|
||||
@@ -104,7 +105,7 @@ read/browse/status surface is Windows-free and the gRPC stack is the default pat
|
||||
| ~~R1.4~~ | `GetHistorianInfoAsync` | `Status.GetHistorianInfo` | ⛔ **string-handle wall** — GETHI returns code 1 on 2020 WCF (all handle/priming variants). GETHI buffer incl. `EventStorageMode`@514. | string-handle RE |
|
||||
| ~~R1.5~~ | Extended-property **read** | `Retrieval.GetTagExtendedPropertiesFromName` | ⛔ **string-handle wall** (GetTepByNm takes `string handle`). TEP result buffer. | string-handle RE |
|
||||
| ~~R1.6~~ | Localized-property **read** | `Retrieval.GetTagLocalizedPropertiesFromName` | ⛔ **string-handle wall** (same family). | string-handle RE |
|
||||
| R1.7 | Event **filters** | filter bytes in `Retrieval.StartEventQuery` | filter predicate encoding (name/op/value) — **`uint`-handle**, reachable | R0.5 |
|
||||
| ~~R1.7~~ | Event **filters** | filter bytes in `Retrieval.StartEventQuery` | ✅ **DONE (2026-06-20), live-honored.** `ReadEventsAsync(start, end, HistorianEventFilter)`. The filter rides `StartEventQuery`'s `pRequestBuff` (captured via `EventQuery.AddEventFilter` + instrument-wcf-writemessage; Equal vs Contains diffed to isolate the op). Filter block: `ushort 0 + uint filterCount + uint condCount + uint nameLen + name(UTF-16) + uint 1 + ushort op + uint 1 + value(0x09-len-0x00 compact-ASCII) + byte 0`. **REAL, not inert** (a non-matching predicate returns 0 events; matching returns the subset). Single string-valued predicate only; multi-filter (OR) / multi-condition (AND via `AddEventFilterCondition`) framing not yet fully captured. See `HistorianEventFilter`, golden `WcfEventQueryProtocolTests`. | — |
|
||||
| R1.8 | Analog-summary query | `Retrieval.StartQuery` (summary mode) | summary row layout — **`uint`-handle, reachable. Scoped + decode targets located** (`CAnalogSummaryValue.UnpackFromValueBuffer`, fields Min/Max/First/Last/ValueCount/Integral/…). Plan: [`r1.8-r1.9-summary-queries.md`](r1.8-r1.9-summary-queries.md) | — |
|
||||
| R1.9 | State-summary query | `Retrieval.StartQuery` (state mode) | state-summary row layout — **`uint`-handle, reachable. Scoped** (`CStateSummaryStruct`: MinContained/MaxContained/TotalContained/PartialStart/PartialEnd/StateEntryCount). Plan: [`r1.8-r1.9-summary-queries.md`](r1.8-r1.9-summary-queries.md) | — |
|
||||
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
<#
|
||||
.SYNOPSIS
|
||||
Captures the native client's StartEventQuery request bytes WITH and WITHOUT an event filter
|
||||
(HCAL roadmap R1.7) so the filter-predicate encoding can be decoded against the empty-filter
|
||||
baseline instead of guessed.
|
||||
|
||||
.DESCRIPTION
|
||||
Drives the NativeTraceHarness `event` scenario against the live Historian under an
|
||||
IL-rewritten aahClientManaged.dll whose ClientMessageEncoder.WriteMessage is instrumented to
|
||||
log every outgoing MDAS body. Runs twice:
|
||||
- baseline : no filter (the known empty-filter StartEventQuery)
|
||||
- filtered : EventQuery.AddEventFilter("Area", Equal, "RetestFilterArea") before StartQuery
|
||||
|
||||
Diff the two StartEventQuery request buffers (scripts/decode-event-filter-capture.py) to read
|
||||
off the exact filter-block bytes (property name / comparison op / value) the native client
|
||||
emits, then implement the managed predicate against that.
|
||||
|
||||
.NOTES
|
||||
Artifacts are diagnostic and gitignored. Sanitize before copying into docs/. Never commit raw
|
||||
capture NDJSON, credentials, hostnames, or customer tag names.
|
||||
#>
|
||||
[CmdletBinding()]
|
||||
param(
|
||||
[string]$ServerName = "localhost",
|
||||
[int]$TcpPort = 32568,
|
||||
[int]$LookbackMinutes = 43200,
|
||||
# Property:Op:Value (Op = a HistorianComparisionType name, e.g. Equal/Contains/GreaterThan).
|
||||
[string]$Filter = "Area:Equal:RetestFilterArea",
|
||||
[string]$Configuration = "Debug"
|
||||
)
|
||||
|
||||
$ErrorActionPreference = "Stop"
|
||||
$repoRoot = Split-Path -Parent $PSScriptRoot
|
||||
Set-Location $repoRoot
|
||||
|
||||
$reProj = Join-Path $repoRoot "tools\AVEVA.Historian.ReverseEngineering\AVEVA.Historian.ReverseEngineering.csproj"
|
||||
$harnessProj = Join-Path $repoRoot "tools\AVEVA.Historian.NativeTraceHarness\AVEVA.Historian.NativeTraceHarness.csproj"
|
||||
$instrProj = Join-Path $repoRoot "tools\AVEVA.Historian.ReverseInstrumentation\AVEVA.Historian.ReverseInstrumentation.csproj"
|
||||
|
||||
$captureDir = Join-Path $repoRoot "artifacts\reverse-engineering\instrumented-wcf-event-filter"
|
||||
$currentCopy = Join-Path $captureDir "current-copy"
|
||||
$instrDll = Join-Path $captureDir "aahClientManaged.dll"
|
||||
|
||||
Write-Host "== Building tooling ($Configuration) ==" -ForegroundColor Cyan
|
||||
dotnet build $reProj -c $Configuration --nologo -v q | Out-Null
|
||||
dotnet build $instrProj -c $Configuration --nologo -v q | Out-Null
|
||||
dotnet build $harnessProj -c $Configuration --nologo -v q | Out-Null
|
||||
|
||||
$instrSourceDll = Get-ChildItem -Recurse (Join-Path $repoRoot "tools\AVEVA.Historian.ReverseInstrumentation\bin\$Configuration") `
|
||||
-Filter "AVEVA.Historian.ReverseInstrumentation.dll" | Select-Object -First 1 -ExpandProperty FullName
|
||||
if (-not $instrSourceDll) { throw "ReverseInstrumentation.dll not found under bin\$Configuration." }
|
||||
|
||||
Write-Host "== Instrumenting WriteMessage ==" -ForegroundColor Cyan
|
||||
New-Item -ItemType Directory -Force -Path $captureDir | Out-Null
|
||||
dotnet run --no-build -c $Configuration --project $reProj -- `
|
||||
instrument-wcf-writemessage (Join-Path $repoRoot "current\aahClientManaged.dll") $instrDll | Out-Null
|
||||
|
||||
Write-Host "== Staging current-copy ==" -ForegroundColor Cyan
|
||||
robocopy (Join-Path $repoRoot "current") $currentCopy /MIR /NJH /NJS /NDL /NP /NC /NS | Out-Null
|
||||
Copy-Item -Force $instrDll (Join-Path $currentCopy "aahClientManaged.dll")
|
||||
Copy-Item -Force $instrSourceDll (Join-Path $currentCopy "AVEVA.Historian.ReverseInstrumentation.dll")
|
||||
|
||||
$harnessDll = Join-Path $currentCopy "aahClientManaged.dll"
|
||||
|
||||
$matrix = @(
|
||||
@{ Name = "baseline"; Args = @() },
|
||||
@{ Name = "filtered"; Args = @("--event-filter", $Filter) }
|
||||
)
|
||||
|
||||
foreach ($cfg in $matrix) {
|
||||
$capturePath = Join-Path $captureDir "event-filter-capture-$($cfg.Name)-latest.ndjson"
|
||||
if (Test-Path $capturePath) { Remove-Item -Force $capturePath }
|
||||
$env:AVEVA_HISTORIAN_RE_CAPTURE = $capturePath
|
||||
|
||||
Write-Host "== Capturing: $($cfg.Name) ==" -ForegroundColor Green
|
||||
$harnessArgs = @(
|
||||
"--scenario", "event",
|
||||
"--server-name", $ServerName,
|
||||
"--tcp-port", "$TcpPort",
|
||||
"--lookback-minutes", "$LookbackMinutes",
|
||||
"--max-rows", "1",
|
||||
"--current-dir", $currentCopy,
|
||||
"--managed-dll-path", $harnessDll
|
||||
) + $cfg.Args
|
||||
|
||||
try {
|
||||
$prevEap = $ErrorActionPreference
|
||||
$ErrorActionPreference = "Continue"
|
||||
& dotnet run --no-build -c $Configuration --project $harnessProj -- @harnessArgs 2>&1 | Out-Null
|
||||
} catch {
|
||||
Write-Host " ($($cfg.Name) raised: $($_.Exception.Message))" -ForegroundColor Yellow
|
||||
} finally {
|
||||
$ErrorActionPreference = $prevEap
|
||||
}
|
||||
|
||||
$recCount = if (Test-Path $capturePath) { (Get-Content $capturePath | Where-Object { $_.Trim() }).Count } else { 0 }
|
||||
Write-Host " -> $recCount records -> $capturePath"
|
||||
}
|
||||
|
||||
Remove-Item Env:\AVEVA_HISTORIAN_RE_CAPTURE -ErrorAction SilentlyContinue
|
||||
Write-Host "`nDecode with: python scripts\decode-event-filter-capture.py" -ForegroundColor Cyan
|
||||
@@ -0,0 +1,113 @@
|
||||
"""Decode the StartEventQuery filter-block encoding (HCAL R1.7).
|
||||
|
||||
Extracts the `pRequestBuff` from the StartEventQuery WriteMessage body in the baseline
|
||||
(no filter) and filtered captures produced by scripts/Capture-EventFilter.ps1, dumps both,
|
||||
and marks where they diverge so the filter predicate (property name / comparison op / value)
|
||||
can be read off the empty-filter baseline.
|
||||
|
||||
Output is diagnostic. Sanitize before copying into docs/.
|
||||
"""
|
||||
import base64
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
CAPDIR = REPO_ROOT / "artifacts" / "reverse-engineering" / "instrumented-wcf-event-filter"
|
||||
PARAM = b"pRequestBuff"
|
||||
OP = b"StartEventQuery"
|
||||
|
||||
|
||||
def extract_request(path):
|
||||
if not path.exists():
|
||||
return None
|
||||
for line in path.open(encoding="utf-8-sig"):
|
||||
if not line.strip():
|
||||
continue
|
||||
rec = json.loads(line)
|
||||
if rec.get("Phase") != "WCF.WriteMessage.Body":
|
||||
continue
|
||||
body = base64.b64decode(rec["Base64"])
|
||||
if OP not in body:
|
||||
continue
|
||||
i = body.find(PARAM)
|
||||
if i < 0:
|
||||
continue
|
||||
i += len(PARAM)
|
||||
for s in range(i, min(i + 16, len(body))):
|
||||
m = body[s]
|
||||
if m == 0x9E:
|
||||
return body[s + 2:s + 2 + body[s + 1]]
|
||||
if m == 0x9F:
|
||||
n = int.from_bytes(body[s + 1:s + 3], "little")
|
||||
return body[s + 3:s + 3 + n]
|
||||
if m == 0xA0:
|
||||
n = int.from_bytes(body[s + 1:s + 3], "little")
|
||||
return body[s + 3:s + 3 + n]
|
||||
return None
|
||||
|
||||
|
||||
def hexdump(label, buf):
|
||||
print(f"=== {label}: {len(buf)} bytes ===")
|
||||
for off in range(0, len(buf), 16):
|
||||
c = buf[off:off + 16]
|
||||
hp = " ".join(f"{x:02X}" for x in c)
|
||||
ap = "".join(chr(x) if 32 <= x < 127 else "." for x in c)
|
||||
print(f" {off:04X} {hp:<48} |{ap}|")
|
||||
print()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
base = extract_request(CAPDIR / "event-filter-capture-baseline-latest.ndjson")
|
||||
filt = extract_request(CAPDIR / "event-filter-capture-filtered-latest.ndjson")
|
||||
if base is None or filt is None:
|
||||
print("Missing capture(s). Run scripts/Capture-EventFilter.ps1 first.")
|
||||
print(f" baseline: {'ok' if base is not None else 'MISSING'}")
|
||||
print(f" filtered: {'ok' if filt is not None else 'MISSING'}")
|
||||
return 1
|
||||
|
||||
hexdump("baseline (no filter) pRequestBuff", base)
|
||||
hexdump("filtered pRequestBuff", filt)
|
||||
|
||||
# First divergence offset.
|
||||
n = min(len(base), len(filt))
|
||||
div = next((i for i in range(n) if base[i] != filt[i]), n)
|
||||
print(f"== First divergence at offset 0x{div:04X} (lenBase={len(base)} lenFilt={len(filt)}) ==")
|
||||
print(" Filtered bytes from divergence (the inserted filter block):")
|
||||
tail = filt[div:]
|
||||
for off in range(0, len(tail), 16):
|
||||
c = tail[off:off + 16]
|
||||
hp = " ".join(f"{x:02X}" for x in c)
|
||||
ap = "".join(chr(x) if 32 <= x < 127 else "." for x in c)
|
||||
print(f" {div + off:04X} {hp:<48} |{ap}|")
|
||||
|
||||
print("\n== Strings in filtered buffer ==")
|
||||
for enc, label in ((b"ascii", "ASCII"), (None, "UTF-16LE")):
|
||||
if enc == b"ascii":
|
||||
cur, start = [], 0
|
||||
for i, x in enumerate(filt):
|
||||
if 32 <= x < 127:
|
||||
if not cur:
|
||||
start = i
|
||||
cur.append(chr(x))
|
||||
else:
|
||||
if len(cur) >= 3:
|
||||
print(f" {label} 0x{start:04X} {''.join(cur)!r}")
|
||||
cur = []
|
||||
else:
|
||||
i = 0
|
||||
while i < len(filt) - 1:
|
||||
j, chars = i, []
|
||||
while j < len(filt) - 1 and 32 <= filt[j] < 127 and filt[j + 1] == 0:
|
||||
chars.append(chr(filt[j]))
|
||||
j += 2
|
||||
if len(chars) >= 3:
|
||||
print(f" {label} 0x{i:04X} {''.join(chars)!r}")
|
||||
i = j
|
||||
else:
|
||||
i += 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -90,7 +90,25 @@ public sealed class HistorianClient : IAsyncDisposable
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ValidateTimeRange(startUtc, endUtc);
|
||||
return _protocol.ReadEventsAsync(startUtc, endUtc, cancellationToken);
|
||||
return _protocol.ReadEventsAsync(startUtc, endUtc, filter: null, cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads events in the time window, server-filtered by a single predicate
|
||||
/// (<paramref name="filter"/>) — e.g. <c>Type Equal "User.Write"</c> or
|
||||
/// <c>Area Contains "Tank"</c>. The historian applies the filter and returns only matching
|
||||
/// events. Filtering is a real server-side operation (live-verified: a non-matching predicate
|
||||
/// returns zero events). Single string-valued predicates only; see <see cref="HistorianEventFilter"/>.
|
||||
/// </summary>
|
||||
public IAsyncEnumerable<HistorianEvent> ReadEventsAsync(
|
||||
DateTime startUtc,
|
||||
DateTime endUtc,
|
||||
HistorianEventFilter filter,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(filter);
|
||||
ValidateTimeRange(startUtc, endUtc);
|
||||
return _protocol.ReadEventsAsync(startUtc, endUtc, filter, cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
namespace AVEVA.Historian.Client.Models;
|
||||
|
||||
/// <summary>
|
||||
/// Comparison operator for a <see cref="HistorianEventFilter"/>. Values mirror the native
|
||||
/// <c>ArchestrA.HistorianComparisionType</c> ordinals and travel on the wire as a UInt16.
|
||||
/// </summary>
|
||||
public enum HistorianEventComparison : ushort
|
||||
{
|
||||
Equal = 0,
|
||||
NotEqual = 1,
|
||||
LessThan = 2,
|
||||
NotLessThan = 3,
|
||||
GreaterThan = 4,
|
||||
NotGreaterThan = 5,
|
||||
LessThanEqual = 6,
|
||||
NotLessThanEqual = 7,
|
||||
GreaterThanEqual = 8,
|
||||
NotGreaterThanEqual = 9,
|
||||
Begins = 10,
|
||||
NotBegins = 11,
|
||||
Contains = 12,
|
||||
NotContains = 13,
|
||||
Exists = 14,
|
||||
NotExists = 15,
|
||||
EndWith = 16,
|
||||
NotEndWith = 17,
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A single server-side event filter predicate: <c>PropertyName Comparison Value</c>
|
||||
/// (e.g. <c>Type Equal "User.Write"</c>, <c>Area Contains "Tank"</c>). Applied to
|
||||
/// <c>ReadEventsAsync</c>; the server returns only events whose named property satisfies the
|
||||
/// comparison. For <see cref="HistorianEventComparison.Exists"/> /
|
||||
/// <see cref="HistorianEventComparison.NotExists"/> the value is ignored but still required by
|
||||
/// the wire format (pass any non-null string).
|
||||
/// </summary>
|
||||
public sealed record HistorianEventFilter(
|
||||
string PropertyName,
|
||||
HistorianEventComparison Comparison,
|
||||
string Value);
|
||||
@@ -42,10 +42,10 @@ internal sealed class Historian2020ProtocolDialect
|
||||
return Missing<HistorianBlock>("StartBlockRetrievalQuery", cancellationToken);
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<HistorianEvent> ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken)
|
||||
public IAsyncEnumerable<HistorianEvent> ReadEventsAsync(DateTime startUtc, DateTime endUtc, HistorianEventFilter? filter, CancellationToken cancellationToken)
|
||||
{
|
||||
HistorianWcfEventOrchestrator orchestrator = new(_options);
|
||||
return orchestrator.ReadEventsAsync(startUtc, endUtc, cancellationToken);
|
||||
return orchestrator.ReadEventsAsync(startUtc, endUtc, filter, cancellationToken);
|
||||
}
|
||||
|
||||
public Task<HistorianConnectionStatus> GetConnectionStatusAsync(CancellationToken cancellationToken)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using AVEVA.Historian.Client.Models;
|
||||
|
||||
namespace AVEVA.Historian.Client.Wcf;
|
||||
|
||||
@@ -7,15 +8,17 @@ internal static class HistorianEventQueryProtocol
|
||||
{
|
||||
public const ushort QueryRequestTypeEvent = 3;
|
||||
|
||||
public static IReadOnlyList<HistorianEventQueryAttempt> CreateStartEventQueryAttempts(DateTime startUtc, DateTime endUtc, uint eventCount)
|
||||
public static IReadOnlyList<HistorianEventQueryAttempt> CreateStartEventQueryAttempts(
|
||||
DateTime startUtc, DateTime endUtc, uint eventCount, HistorianEventFilter? filter = null)
|
||||
{
|
||||
List<HistorianEventQueryAttempt> attempts = [];
|
||||
attempts.Add(CreateNativeEmptyFilterAttempt(startUtc, endUtc, eventCount));
|
||||
attempts.Add(CreateNativeFilterAttempt(startUtc, endUtc, eventCount, filter));
|
||||
|
||||
return attempts;
|
||||
}
|
||||
|
||||
private static HistorianEventQueryAttempt CreateNativeEmptyFilterAttempt(DateTime startUtc, DateTime endUtc, uint eventCount)
|
||||
private static HistorianEventQueryAttempt CreateNativeFilterAttempt(
|
||||
DateTime startUtc, DateTime endUtc, uint eventCount, HistorianEventFilter? filter)
|
||||
{
|
||||
using MemoryStream stream = new();
|
||||
using BinaryWriter writer = new(stream, Encoding.Unicode, leaveOpen: true);
|
||||
@@ -27,7 +30,14 @@ internal static class HistorianEventQueryProtocol
|
||||
writer.Write(0u);
|
||||
writer.Write((ushort)0);
|
||||
writer.Write((ushort)1);
|
||||
WriteNativeEmptyFilterBlock(writer);
|
||||
if (filter is null)
|
||||
{
|
||||
WriteNativeEmptyFilterBlock(writer);
|
||||
}
|
||||
else
|
||||
{
|
||||
WriteFilterBlock(writer, filter);
|
||||
}
|
||||
writer.Write(65_536u);
|
||||
WriteHistorianString(writer, "UTC");
|
||||
WriteMetadataNamespace(writer);
|
||||
@@ -35,12 +45,64 @@ internal static class HistorianEventQueryProtocol
|
||||
|
||||
byte[] request = stream.ToArray();
|
||||
return new HistorianEventQueryAttempt(
|
||||
"native-empty-filter-version5",
|
||||
filter is null ? "native-empty-filter-version5" : "native-filter-version5",
|
||||
5,
|
||||
request,
|
||||
Convert.ToHexString(SHA256.HashData(request)).ToLowerInvariant());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Filter block carrying one predicate (property / comparison / value), decoded byte-for-byte
|
||||
/// from instrument-wcf-writemessage captures of <c>EventQuery.AddEventFilter</c> (Equal vs
|
||||
/// Contains diffed to isolate the operator field). Layout:
|
||||
/// <code>
|
||||
/// UInt16 0 // header word (also written by the empty-filter block)
|
||||
/// UInt32 filterCount = 1
|
||||
/// UInt32 conditionCount = 1
|
||||
/// UInt32 propertyNameCharCount
|
||||
/// propertyName (UTF-16LE)
|
||||
/// UInt32 1 // constant (per-condition marker; observed = 1)
|
||||
/// UInt16 comparison // HistorianComparisionType ordinal
|
||||
/// UInt32 1 // constant (value marker; observed = 1)
|
||||
/// value: 0x09 LEN 0x00 LEN×ASCII // compact-ASCII string (same as event/CTagMetadata)
|
||||
/// Byte 0 // block terminator
|
||||
/// </code>
|
||||
/// Single-predicate only — multi-filter (OR) / multi-condition (AND) framing is not yet
|
||||
/// fully captured. String values only; the value is always emitted (even for Exists).
|
||||
/// </summary>
|
||||
private static void WriteFilterBlock(BinaryWriter writer, HistorianEventFilter filter)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrEmpty(filter.PropertyName);
|
||||
ArgumentNullException.ThrowIfNull(filter.Value);
|
||||
|
||||
writer.Write((ushort)0);
|
||||
writer.Write(1u); // filterCount
|
||||
writer.Write(1u); // conditionCount
|
||||
writer.Write((uint)filter.PropertyName.Length); // char count
|
||||
writer.Write(Encoding.Unicode.GetBytes(filter.PropertyName));
|
||||
writer.Write(1u); // constant marker
|
||||
writer.Write((ushort)filter.Comparison);
|
||||
writer.Write(1u); // constant marker
|
||||
WriteCompactAsciiString(writer, filter.Value);
|
||||
writer.Write((byte)0); // block terminator
|
||||
}
|
||||
|
||||
/// <summary>Compact ASCII string: <c>0x09 LEN 0x00 LEN×ASCII bytes</c>.</summary>
|
||||
private static void WriteCompactAsciiString(BinaryWriter writer, string value)
|
||||
{
|
||||
byte[] ascii = Encoding.ASCII.GetBytes(value);
|
||||
if (ascii.Length > byte.MaxValue)
|
||||
{
|
||||
throw new ProtocolEvidenceMissingException(
|
||||
$"Event filter value '{value}' exceeds the single-byte length captured for the compact-string encoding.");
|
||||
}
|
||||
|
||||
writer.Write((byte)0x09);
|
||||
writer.Write((byte)ascii.Length);
|
||||
writer.Write((byte)0);
|
||||
writer.Write(ascii);
|
||||
}
|
||||
|
||||
private static HistorianEventQueryAttempt CreateAttempt(
|
||||
string shape,
|
||||
ushort version,
|
||||
|
||||
@@ -66,6 +66,7 @@ internal sealed class HistorianWcfEventOrchestrator
|
||||
public async IAsyncEnumerable<HistorianEvent> ReadEventsAsync(
|
||||
DateTime startUtc,
|
||||
DateTime endUtc,
|
||||
HistorianEventFilter? filter,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_options.IntegratedSecurity && string.IsNullOrEmpty(_options.UserName))
|
||||
@@ -77,7 +78,7 @@ internal sealed class HistorianWcfEventOrchestrator
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
IReadOnlyList<HistorianEvent> events = await Task.Run(
|
||||
() => RunEventChain(startUtc, endUtc, cancellationToken),
|
||||
() => RunEventChain(startUtc, endUtc, filter, cancellationToken),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
foreach (HistorianEvent evt in events)
|
||||
@@ -143,7 +144,7 @@ internal sealed class HistorianWcfEventOrchestrator
|
||||
return sent;
|
||||
}
|
||||
|
||||
private List<HistorianEvent> RunEventChain(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken)
|
||||
private List<HistorianEvent> RunEventChain(DateTime startUtc, DateTime endUtc, HistorianEventFilter? filter, CancellationToken cancellationToken)
|
||||
{
|
||||
Guid contextKey = Guid.NewGuid();
|
||||
var (histBinding, histEndpoint, retrBinding, retrEndpoint) = HistorianWcfBindingFactory.CreateBindingPair(_options);
|
||||
@@ -155,7 +156,7 @@ internal sealed class HistorianWcfEventOrchestrator
|
||||
connectionMode: HistorianWcfAuthChainHelper.NativeIntegratedReadOnlyConnectionMode,
|
||||
additionalSetup: (historyChannel, context) =>
|
||||
AddCmEventTagViaAddT(historyChannel, context, auxBinding, statusEndpoint, transactionEndpoint, retrBinding, retrEndpoint));
|
||||
return RunEventQuery(retrBinding, retrEndpoint, clientHandle, startUtc, endUtc, cancellationToken);
|
||||
return RunEventQuery(retrBinding, retrEndpoint, clientHandle, startUtc, endUtc, filter, cancellationToken);
|
||||
}
|
||||
|
||||
private List<HistorianEvent> RunEventQuery(
|
||||
@@ -164,6 +165,7 @@ internal sealed class HistorianWcfEventOrchestrator
|
||||
uint clientHandle,
|
||||
DateTime startUtc,
|
||||
DateTime endUtc,
|
||||
HistorianEventFilter? filter,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ChannelFactory<IRetrievalServiceContract4> factory = new(binding, retrievalEndpoint);
|
||||
@@ -187,7 +189,8 @@ internal sealed class HistorianWcfEventOrchestrator
|
||||
IReadOnlyList<HistorianEventQueryAttempt> attempts = HistorianEventQueryProtocol.CreateStartEventQueryAttempts(
|
||||
startUtc.ToUniversalTime(),
|
||||
endUtc.ToUniversalTime(),
|
||||
eventCount: 5);
|
||||
eventCount: 5,
|
||||
filter);
|
||||
byte[] requestBuffer = attempts[0].RequestBuffer;
|
||||
|
||||
uint queryHandle = 0;
|
||||
|
||||
@@ -36,7 +36,7 @@ public sealed class EventChainDiagnosticTests
|
||||
|
||||
int observed = 0;
|
||||
AVEVA.Historian.Client.Models.HistorianEvent? firstEvent = null;
|
||||
await foreach (var evt in orchestrator.ReadEventsAsync(startUtc, endUtc, CancellationToken.None))
|
||||
await foreach (var evt in orchestrator.ReadEventsAsync(startUtc, endUtc, filter: null, CancellationToken.None))
|
||||
{
|
||||
observed++;
|
||||
firstEvent ??= evt;
|
||||
|
||||
@@ -745,6 +745,53 @@ public sealed class HistorianClientIntegrationTests
|
||||
Assert.False(string.IsNullOrWhiteSpace(metadata.EngineeringUnit));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadEventsAsync_WithFilter_IsHonoredByServer()
|
||||
{
|
||||
string? host = Environment.GetEnvironmentVariable("HISTORIAN_HOST");
|
||||
if (string.IsNullOrWhiteSpace(host) || !string.Equals(host, "localhost", StringComparison.OrdinalIgnoreCase) || !OperatingSystem.IsWindows())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
HistorianClient client = new(new HistorianClientOptions
|
||||
{
|
||||
Host = host,
|
||||
IntegratedSecurity = true,
|
||||
Transport = HistorianTransport.LocalPipe
|
||||
});
|
||||
|
||||
DateTime endUtc = DateTime.UtcNow;
|
||||
DateTime startUtc = endUtc - TimeSpan.FromDays(30);
|
||||
|
||||
// A predicate that matches nothing must return zero events — proving the server applies
|
||||
// the filter (not inert), unlike e.g. the analog-summary knobs.
|
||||
List<AVEVA.Historian.Client.Models.HistorianEvent> noMatch = [];
|
||||
await foreach (var evt in client.ReadEventsAsync(startUtc, endUtc,
|
||||
new AVEVA.Historian.Client.Models.HistorianEventFilter("Type",
|
||||
AVEVA.Historian.Client.Models.HistorianEventComparison.Equal, "ZZZ_NoSuchEventType"),
|
||||
CancellationToken.None))
|
||||
{
|
||||
noMatch.Add(evt);
|
||||
}
|
||||
Assert.Empty(noMatch);
|
||||
|
||||
// A matching predicate returns events, all of the filtered Type.
|
||||
List<AVEVA.Historian.Client.Models.HistorianEvent> matched = [];
|
||||
await foreach (var evt in client.ReadEventsAsync(startUtc, endUtc,
|
||||
new AVEVA.Historian.Client.Models.HistorianEventFilter("Type",
|
||||
AVEVA.Historian.Client.Models.HistorianEventComparison.Equal, "User.Write"),
|
||||
CancellationToken.None))
|
||||
{
|
||||
matched.Add(evt);
|
||||
}
|
||||
|
||||
// Requires User.Write events in the window (present on a working Historian). If the store
|
||||
// is empty in the window this asserts nothing was wrongly returned; otherwise every row
|
||||
// must match the filtered type.
|
||||
Assert.All(matched, evt => Assert.Equal("User.Write", evt.Type));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SendEventAsync_AgainstLocalHistorian_AcceptedByServer()
|
||||
{
|
||||
|
||||
@@ -1,9 +1,63 @@
|
||||
using AVEVA.Historian.Client.Models;
|
||||
using AVEVA.Historian.Client.Wcf;
|
||||
|
||||
namespace AVEVA.Historian.Client.Tests;
|
||||
|
||||
public sealed class WcfEventQueryProtocolTests
|
||||
{
|
||||
// Filter block (offset 0x1E into pRequestBuff) captured from a native
|
||||
// EventQuery.AddEventFilter("Area", Equal, "RetestFilterArea") StartEventQuery, via
|
||||
// instrument-wcf-writemessage.
|
||||
private const string CaptureFilterBlockHex =
|
||||
"000001000000010000000400000041007200650061000100000000000100000009100052657465737446696c7465724172656100";
|
||||
private const int FilterBlockOffset = 0x1E;
|
||||
|
||||
[Fact]
|
||||
public void SerializerMatchesInstrumentedNativeEventFilterBlock()
|
||||
{
|
||||
HistorianEventQueryAttempt attempt = Assert.Single(HistorianEventQueryProtocol.CreateStartEventQueryAttempts(
|
||||
new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc),
|
||||
new DateTime(2026, 1, 2, 0, 0, 0, DateTimeKind.Utc),
|
||||
5,
|
||||
new HistorianEventFilter("Area", HistorianEventComparison.Equal, "RetestFilterArea")));
|
||||
|
||||
byte[] expectedBlock = Convert.FromHexString(CaptureFilterBlockHex);
|
||||
byte[] actualBlock = attempt.RequestBuffer[FilterBlockOffset..(FilterBlockOffset + expectedBlock.Length)];
|
||||
|
||||
Assert.Equal("native-filter-version5", attempt.Name);
|
||||
Assert.Equal(expectedBlock, actualBlock);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void FilterBlockEncodesComparisonOperatorAsUInt16()
|
||||
{
|
||||
// Equal = 0, Contains = 12 — the op field is the only byte that differs between them
|
||||
// (offset 0x38 in pRequestBuff = 0x1A into the filter block).
|
||||
byte[] equal = HistorianEventQueryProtocol.CreateStartEventQueryAttempts(
|
||||
new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc),
|
||||
new DateTime(2026, 1, 2, 0, 0, 0, DateTimeKind.Utc),
|
||||
5, new HistorianEventFilter("Area", HistorianEventComparison.Equal, "X"))[0].RequestBuffer;
|
||||
byte[] contains = HistorianEventQueryProtocol.CreateStartEventQueryAttempts(
|
||||
new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc),
|
||||
new DateTime(2026, 1, 2, 0, 0, 0, DateTimeKind.Utc),
|
||||
5, new HistorianEventFilter("Area", HistorianEventComparison.Contains, "X"))[0].RequestBuffer;
|
||||
|
||||
Assert.Equal(0x38, FilterBlockOffset + 0x1A);
|
||||
Assert.Equal(0, BitConverter.ToUInt16(equal, 0x38));
|
||||
Assert.Equal(12, BitConverter.ToUInt16(contains, 0x38));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void NullFilterStillProducesTheEmptyFilterBuffer()
|
||||
{
|
||||
HistorianEventQueryAttempt withNull = HistorianEventQueryProtocol.CreateStartEventQueryAttempts(
|
||||
new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc),
|
||||
new DateTime(2026, 1, 1, 0, 1, 0, DateTimeKind.Utc), 3)[0];
|
||||
|
||||
Assert.Equal("native-empty-filter-version5", withNull.Name);
|
||||
Assert.Equal(65, withNull.RequestBuffer.Length);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SerializerMatchesInstrumentedNativeEventRequest()
|
||||
{
|
||||
|
||||
@@ -84,6 +84,31 @@ internal static class Program
|
||||
return 0;
|
||||
}
|
||||
|
||||
string? dumpTypeName = GetArg(args, "--dump-type-members");
|
||||
if (dumpTypeName is not null)
|
||||
{
|
||||
Type dumpType = GetType(assembly, dumpTypeName);
|
||||
if (dumpType.IsEnum)
|
||||
{
|
||||
var values = Enum.GetValues(dumpType).Cast<object>()
|
||||
.Select(v => $"{v} = {Convert.ToInt64(v)}").OrderBy(s => s).ToArray();
|
||||
Console.WriteLine(Serialize(new { Type = dumpType.FullName, EnumValues = values }));
|
||||
return 0;
|
||||
}
|
||||
BindingFlags df = BindingFlags.Instance | BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic;
|
||||
Console.WriteLine(Serialize(new
|
||||
{
|
||||
Type = dumpType.FullName,
|
||||
Properties = dumpType.GetProperties(df).Select(p => $"{p.PropertyType.Name} {p.Name}").OrderBy(s => s).ToArray(),
|
||||
Fields = dumpType.GetFields(df).Select(f => $"{f.FieldType.Name} {f.Name}").OrderBy(s => s).ToArray(),
|
||||
Methods = dumpType.GetMethods(df)
|
||||
.Where(m => !m.IsSpecialName)
|
||||
.Select(m => $"{m.ReturnType.Name} {m.Name}({string.Join(", ", m.GetParameters().Select(p => p.ParameterType.Name))})")
|
||||
.OrderBy(s => s).ToArray(),
|
||||
}));
|
||||
return 0;
|
||||
}
|
||||
|
||||
Type accessType = GetType(assembly, "ArchestrA.HistorianAccess");
|
||||
Type connectionArgsType = GetType(assembly, "ArchestrA.HistorianConnectionArgs");
|
||||
Type connectionStatusType = GetType(assembly, "ArchestrA.HistorianConnectionStatus");
|
||||
@@ -224,6 +249,41 @@ internal static class Program
|
||||
SetProperty(queryArgs, "EventOrder", Enum.Parse(eventOrderType, "Ascending"));
|
||||
snapshots["EventQueryArgsBeforeStart"] = SnapshotObject(queryArgs);
|
||||
|
||||
// R1.7 event-filter capture: --event-filter "Property:Op:Value" (repeatable via ';').
|
||||
// Calls EventQuery.AddEventFilter(name, HistorianComparisionType, value, out err) so the
|
||||
// filter predicate rides StartEventQuery's request buffer for instrument-wcf capture.
|
||||
string? eventFilterSpec = GetArg(args, "--event-filter");
|
||||
if (!string.IsNullOrWhiteSpace(eventFilterSpec))
|
||||
{
|
||||
Type comparisonType = GetType(assembly, "ArchestrA.HistorianComparisionType");
|
||||
MethodInfo addFilterMethod = queryType.GetMethod("AddEventFilter",
|
||||
new[] { typeof(string), comparisonType, typeof(object), errorType.MakeByRefType() })
|
||||
?? throw new MissingMethodException("EventQuery.AddEventFilter");
|
||||
foreach (string clause in eventFilterSpec!.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries))
|
||||
{
|
||||
string[] parts = clause.Split(new[] { ':' }, 3);
|
||||
if (parts.Length < 3)
|
||||
{
|
||||
throw new ArgumentException($"--event-filter clause '{clause}' must be Property:Op:Value.");
|
||||
}
|
||||
|
||||
object filterError = Activator.CreateInstance(errorType)!;
|
||||
object?[] addFilterArgs = [parts[0], Enum.Parse(comparisonType, parts[1], ignoreCase: true), parts[2], filterError];
|
||||
object addFilterResult = addFilterMethod.Invoke(query, addFilterArgs)!;
|
||||
filterError = addFilterArgs[3]!;
|
||||
rows.Add(new
|
||||
{
|
||||
Kind = "AddEventFilter",
|
||||
Property = parts[0],
|
||||
Op = parts[1],
|
||||
Value = parts[2],
|
||||
FilterId = addFilterResult,
|
||||
ErrorDescription = GetPropertyText(filterError, "ErrorDescription"),
|
||||
});
|
||||
}
|
||||
snapshots["EventQueryAfterAddFilter"] = SnapshotObject(query);
|
||||
}
|
||||
|
||||
startError = Activator.CreateInstance(errorType)!;
|
||||
MethodInfo startMethod = queryType.GetMethod("StartQuery", new[] { eventQueryArgsType, errorType.MakeByRefType() })
|
||||
?? throw new MissingMethodException("EventQuery.StartQuery");
|
||||
|
||||
Reference in New Issue
Block a user