feat(grpc-events): v6 StartEventQuery request + capture-event harness scenario

Captured the stock 2023 R2 client doing a gRPC event read (50 rows flowed) to
resolve the open "gRPC event ROW retrieval returns zero rows" item. Two captured
differences from our SDK's path; this lands the first (necessary) one plus the
capture tooling.

- HistorianEventQueryProtocol.CreateStartEventQueryAttempts: add a `version`
  parameter (default 5 = the 2020 WCF format, unchanged). The gRPC event
  orchestrator now opts into version 6 — the leading `06` plus a 5-byte trailing
  zero pad — which is the envelope the stock 2023 R2 client sends. The two
  buffers are otherwise byte-identical (filter block, UTC string, metadata
  namespace). Golden test Version6EmptyFilterMatchesCapturedGrpcEnvelope pins it.
- Grpc2023CaptureHarness: new `capture-event` scenario drives HistorianAccess
  over an Event-type gRPC connection (CreateEventQuery -> EventQueryArgs ->
  StartQuery -> MoveNext) so the wide-net instrument-grpc-nonstream rewrite dumps
  StartEventQuery.requestBuffer + the row result. Hostname defaults sanitized to
  HISTORIAN_GRPC_HOST / "localhost" (removed hardcoded server name).

NECESSARY BUT NOT SUFFICIENT: live validation shows v6 alone does not make rows
flow — the read also requires an Event-type connection, which our SDK's v6 Open2
format cannot express (see the companion docs commit). The gated
ReadEventsAsync_OverGrpc_* test correctly still pins the no-row throw. 322/322
offline tests pass; WCF event path unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
Joseph Doherty
2026-06-22 10:41:15 -04:00
parent d9051ba890
commit dbb5c99c53
4 changed files with 238 additions and 13 deletions
@@ -266,11 +266,17 @@ internal sealed class HistorianGrpcEventOrchestrator
new GrpcRetrieval.GetRetrievalInterfaceVersionRequest(), connection.Metadata, Deadline(), cancellationToken);
HistorianServerVersionGate.Validate(HistorianServiceInterface.Retrieval, retrievalVersion.UiVersion, _options);
// Version 6 envelope: the stock 2023 R2 client sends v6 (the WCF path's v5 request is accepted
// here but is the legacy format). NECESSARY but not alone sufficient — live validation 2026-06-22
// showed rows still don't flow on v6 because the read also requires an EVENT-type connection
// (the stock client opens ConnectionType=Event; our OpenSession opens a Process-style 0x402
// session). See docs/reverse-engineering/grpc-event-query-capture.md "remaining gate".
IReadOnlyList<HistorianEventQueryAttempt> attempts = HistorianEventQueryProtocol.CreateStartEventQueryAttempts(
startUtc.ToUniversalTime(),
endUtc.ToUniversalTime(),
eventCount: 5,
filter);
filter,
version: 6);
byte[] requestBuffer = attempts[0].RequestBuffer;
GrpcRetrieval.StartEventQueryResponse startResponse = retrievalClient.StartEventQuery(
@@ -8,22 +8,31 @@ internal static class HistorianEventQueryProtocol
{
public const ushort QueryRequestTypeEvent = 3;
/// <summary>
/// Builds the <c>StartEventQuery</c> <c>pRequestBuff</c>. <paramref name="version"/> selects the
/// envelope revision: <b>5</b> (default) is the native 2020 WCF format used by the WCF event
/// orchestrator; <b>6</b> is the 2023 R2 gRPC format. The two envelopes are byte-identical except
/// the leading version word and a 5-byte trailing zero pad — captured 2026-06-22 from the stock
/// 2023 R2 client (see <c>docs/reverse-engineering/grpc-event-query-capture.md</c>). The 2023 R2
/// server returns rows only for v6; v5 is accepted (StartEventQuery succeeds) but matches no rows.
/// The filter block in the middle is unchanged across versions.
/// </summary>
public static IReadOnlyList<HistorianEventQueryAttempt> CreateStartEventQueryAttempts(
DateTime startUtc, DateTime endUtc, uint eventCount, HistorianEventFilter? filter = null)
DateTime startUtc, DateTime endUtc, uint eventCount, HistorianEventFilter? filter = null, ushort version = 5)
{
List<HistorianEventQueryAttempt> attempts = [];
attempts.Add(CreateNativeFilterAttempt(startUtc, endUtc, eventCount, filter));
attempts.Add(CreateNativeFilterAttempt(startUtc, endUtc, eventCount, filter, version));
return attempts;
}
private static HistorianEventQueryAttempt CreateNativeFilterAttempt(
DateTime startUtc, DateTime endUtc, uint eventCount, HistorianEventFilter? filter)
DateTime startUtc, DateTime endUtc, uint eventCount, HistorianEventFilter? filter, ushort version)
{
using MemoryStream stream = new();
using BinaryWriter writer = new(stream, Encoding.Unicode, leaveOpen: true);
writer.Write((ushort)5);
writer.Write(version);
writer.Write(startUtc.ToFileTimeUtc());
writer.Write(endUtc.ToFileTimeUtc());
writer.Write(eventCount);
@@ -43,10 +52,19 @@ internal static class HistorianEventQueryProtocol
WriteMetadataNamespace(writer);
writer.Write(0u);
// Version 6 (2023 R2 gRPC) appends a 5-byte trailing zero pad after the v5 terminal — the only
// envelope delta from v5 besides the version word. Captured live: the v6 buffer is the v5 buffer
// (byte 0 = 6) plus these 5 bytes, and is the form the 2023 R2 server returns event rows for.
if (version >= 6)
{
writer.Write(0u);
writer.Write((byte)0);
}
byte[] request = stream.ToArray();
return new HistorianEventQueryAttempt(
filter is null ? "native-empty-filter-version5" : "native-filter-version5",
5,
filter is null ? $"native-empty-filter-version{version}" : $"native-filter-version{version}",
version,
request,
Convert.ToHexString(SHA256.HashData(request)).ToLowerInvariant());
}
@@ -73,6 +73,33 @@ public sealed class WcfEventQueryProtocolTests
Assert.Equal("6b955b02087047a3199a8c74f3eee85c3b49aaa29b05de12eff2dd536f2da0d5", attempt.RequestSha256);
}
[Fact]
public void Version6EmptyFilterMatchesCapturedGrpcEnvelope()
{
// Captured 2026-06-22 from the stock 2023 R2 client (docs/reverse-engineering/grpc-event-query-capture.md):
// the v6 StartEventQuery request is byte-identical to the v5 buffer except byte 0 (version 6) and a
// 5-byte trailing zero pad (70 vs 65 bytes). The 2023 R2 server returns event rows only for v6.
DateTime start = new DateTime(2026, 4, 25, 14, 39, 36, 800, DateTimeKind.Utc).AddTicks(1646);
DateTime end = new DateTime(2026, 5, 2, 14, 39, 36, 800, DateTimeKind.Utc).AddTicks(1646);
byte[] v5 = HistorianEventQueryProtocol.CreateStartEventQueryAttempts(start, end, 3)[0].RequestBuffer;
HistorianEventQueryAttempt v6Attempt = Assert.Single(
HistorianEventQueryProtocol.CreateStartEventQueryAttempts(start, end, 3, filter: null, version: 6));
byte[] v6 = v6Attempt.RequestBuffer;
Assert.Equal("native-empty-filter-version6", v6Attempt.Name);
Assert.Equal(6, v6Attempt.Version);
Assert.Equal(70, v6.Length);
Assert.Equal([0x06, 0x00], v6[..2]);
// v6 == v5 with byte 0 -> 6 and 5 trailing zero bytes appended.
byte[] expected = new byte[70];
Array.Copy(v5, expected, v5.Length);
expected[0] = 0x06;
Assert.Equal(expected, v6);
Assert.Equal([0x00, 0x00, 0x00, 0x00, 0x00], v6[^5..]);
}
[Fact]
public void NativeEmptyFilterAttemptMatchesDecompiledSaveOrder()
{
@@ -90,8 +90,10 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
return CaptureWrite(managedDll, args);
case "delete-tag":
return DeleteTag(managedDll, args);
case "capture-event":
return CaptureEvent(managedDll, args);
default:
Console.Error.WriteLine($"Unknown scenario '{scenario}'. Supported: load-check, connect, capture-write, delete-tag.");
Console.Error.WriteLine($"Unknown scenario '{scenario}'. Supported: load-check, connect, capture-write, delete-tag, capture-event.");
return 1;
}
}
@@ -114,7 +116,7 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
Type tagStatusType = Req(asm, "ArchestrA.HistorianTagStatus");
Type tagStatusListType = Req(asm, "ArchestrA.HistorianTagStatusList");
string server = GetOption(args, "--server") ?? "WONDER-SQL-VD03";
string server = GetOption(args, "--server") ?? Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? "localhost";
int port = int.TryParse(GetOption(args, "--port"), out int p) ? p : 32565;
string certName = GetOption(args, "--cert") ?? server;
string tagName = GetOption(args, "--tag") ?? "SdkM3CaptureSandbox";
@@ -201,7 +203,7 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
/// SendValues (the actual wire push) only runs with --commit. Run with --grpc-rewrite pointing
/// at the instrumented copy and AVEVA_HISTORIAN_RE_CAPTURE set to the output file.
/// Usage: capture-write --tag SdkM3CaptureSandbox [--create] [--commit]
/// [--server WONDER-SQL-VD03] [--port 32565] [--cert WONDER-SQL-VD03] [--value 123.0]
/// [--server <host>] [--port 32565] [--cert <host>] [--value 123.0]
/// </summary>
private static int CaptureWrite(string managedDll, string[] args)
{
@@ -221,7 +223,7 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
Type listType = Req(asm, "ArchestrA.HistorianDataValueList");
Type categoryEnum = Req(asm, "ArchestrA.HistorianDataCategory");
string server = GetOption(args, "--server") ?? "WONDER-SQL-VD03";
string server = GetOption(args, "--server") ?? Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? "localhost";
int port = int.TryParse(GetOption(args, "--port"), out int p) ? p : 32565;
string certName = GetOption(args, "--cert") ?? server;
string tagName = GetOption(args, "--tag") ?? "SdkM3CaptureSandbox";
@@ -429,12 +431,184 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
result = m.Invoke(target, a);
}
/// <summary>
/// Drives the native 2023 R2 client through a read-only gRPC EVENT query so the IL-rewritten
/// GrpcRetrievalClient dumps the uncaptured event buffers: StartEventQuery.requestBuffer (the
/// empty-filter request shape our SDK's CreateNativeEmptyFilterAttempt is being compared against)
/// and GetNextEventQueryResultBuffer.result (the row buffer — proves rows flow when driven right).
///
/// CRITICAL: the connection is opened with ConnectionType=Event (NOT Process). CreateEventQuery()
/// returns null unless IsEventConnectionRequested() — the native event read runs on ConnectionIndex 1,
/// a separate connection from the process/data path. This is the prime suspect for why the SDK's
/// gRPC empty-filter query returns zero rows despite the server holding events.
///
/// Sequence: OpenConnection(Event, read-only, gRPC) -> CreateEventQuery() ->
/// EventQueryArgs{StartDateTime,EndDateTime,EventCount} -> EventQuery.StartQuery(args) ->
/// loop EventQuery.MoveNext()/QueryResult -> EventQuery.EndQuery() -> CloseConnection.
/// Run with --grpc-rewrite pointing at the instrumented Archestra.Historian.GrpcClient.dll and
/// AVEVA_HISTORIAN_RE_CAPTURE set to the output NDJSON. Read-only — non-destructive.
/// Usage: capture-event [--server <host>] [--port 32565] [--cert <host>]
/// [--lookback-hours 720] [--max-events 50] [--integrated]
/// </summary>
private static int CaptureEvent(string managedDll, string[] args)
{
Assembly asm = Assembly.LoadFrom(managedDll);
Type accessType = Req(asm, "ArchestrA.HistorianAccess");
Type connArgsType = Req(asm, "ArchestrA.HistorianConnectionArgs");
Type connModeType = Req(asm, "ArchestrA.HistorianConnectionMode");
Type connTypeType = Req(asm, "ArchestrA.HistorianConnectionType");
Type errorType = Req(asm, "ArchestrA.HistorianAccessError");
Type statusType = Req(asm, "ArchestrA.HistorianConnectionStatus");
Type certInfoType = Req(asm, "ArchestrA.CertificateInfo");
Type secModeType = Req(asm, "ArchestrA.HistorianSecurityMode");
Type eventQueryType = Req(asm, "ArchestrA.EventQuery");
Type eventArgsType = Req(asm, "ArchestrA.EventQueryArgs");
string server = GetOption(args, "--server") ?? Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? "localhost";
int port = int.TryParse(GetOption(args, "--port"), out int p) ? p : 32565;
string certName = GetOption(args, "--cert") ?? server;
int lookbackHours = int.TryParse(GetOption(args, "--lookback-hours"), out int lh) ? lh : 720;
int maxEvents = int.TryParse(GetOption(args, "--max-events"), out int me) ? me : 50;
bool integrated = args.Contains("--integrated");
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
if (!integrated && string.IsNullOrEmpty(user))
{
Console.Error.WriteLine("Set HISTORIAN_USER/HISTORIAN_PASSWORD or pass --integrated.");
return 1;
}
object connArgs = Activator.CreateInstance(connArgsType)!;
SetProp(connArgs, "ServerName", server);
SetProp(connArgs, "TcpPort", checked((ushort)port));
SetProp(connArgs, "ConnectionMode", Enum.Parse(connModeType, "Historian")); // 2 = gRPC
SetProp(connArgs, "ConnectionType", Enum.Parse(connTypeType, "Event")); // EVENT connection
SetProp(connArgs, "ReadOnly", true);
SetProp(connArgs, "IntegratedSecurity", integrated);
SetProp(connArgs, "AllowUnTrustedConnection", true);
if (!integrated)
{
SetProp(connArgs, "UserName", user!);
SetProp(connArgs, "Password", password ?? string.Empty);
}
object certInfo = Activator.CreateInstance(certInfoType)!;
TrySetProp(certInfo, "CertificateName", certName);
TrySetProp(certInfo, "SecurityMode", Enum.Parse(secModeType, "TransportCertificate"));
TrySetProp(connArgs, "SecurityInfo", certInfo);
object access = Activator.CreateInstance(accessType)!;
object error = Activator.CreateInstance(errorType)!;
object?[] openArgs = { connArgs, error };
Console.WriteLine($"OpenConnection: server={server} port={port} mode=Historian type=Event cert={certName} integrated={integrated} readonly=true");
bool opened;
try
{
opened = (bool)accessType.GetMethod("OpenConnection", new[] { connArgsType, errorType.MakeByRefType() })!
.Invoke(access, openArgs)!;
}
catch (TargetInvocationException tie)
{
Console.Error.WriteLine($"OpenConnection threw: {tie.InnerException?.GetType().Name}: {tie.InnerException?.Message}");
return 2;
}
Console.WriteLine($"OpenConnection returned: {opened} err={DescribeError(openArgs[1])}");
if (!opened) { return 2; }
try
{
// Let the event connection (ConnectionIndex 1) come up.
MethodInfo getStatus = accessType.GetMethod("GetConnectionStatus", new[] { statusType.MakeByRefType() })
?? accessType.GetMethods().First(m => m.Name == "GetConnectionStatus" && m.GetParameters().Length == 1);
for (int i = 0; i < 10; i++)
{
object?[] sArgs = { null };
getStatus.Invoke(access, sArgs);
if (ReadBoolProp(sArgs[0], "ConnectedToServer") || !ReadBoolProp(sArgs[0], "Pending")) break;
System.Threading.Thread.Sleep(500);
}
// CreateEventQuery() is non-null only when the connection is an event connection.
MethodInfo createEventQuery = accessType.GetMethod("CreateEventQuery", Type.EmptyTypes)
?? accessType.GetMethods().First(m => m.Name == "CreateEventQuery" && m.GetParameters().Length == 0);
object? eventQuery = createEventQuery.Invoke(access, null);
Console.WriteLine($"CreateEventQuery: {(eventQuery == null ? "NULL (event connection not established!)" : "ok")}");
if (eventQuery == null) { return 3; }
// Build EventQueryArgs over the populated window. Times in UTC.
object eventArgs = Activator.CreateInstance(eventArgsType)!;
DateTime endUtc = DateTime.UtcNow;
DateTime startUtc = endUtc.AddHours(-lookbackHours);
TrySetProp(eventArgs, "StartDateTime", DateTime.SpecifyKind(startUtc, DateTimeKind.Utc));
TrySetProp(eventArgs, "EndDateTime", DateTime.SpecifyKind(endUtc, DateTimeKind.Utc));
TrySetProp(eventArgs, "EventCount", checked((uint)maxEvents));
Console.WriteLine($"EventQueryArgs: start={startUtc:o} end={endUtc:o} eventCount={maxEvents}");
// StartQuery -> triggers GrpcRetrievalClient.StartEventQuery (requestBuffer CAPTURED).
MethodInfo startQuery = eventQueryType.GetMethods()
.First(m => m.Name == "StartQuery" && m.GetParameters().Length == 2);
object?[] startArgs = { eventArgs, Activator.CreateInstance(errorType) };
bool started = (bool)startQuery.Invoke(eventQuery, startArgs)!;
Console.WriteLine($"StartQuery: {started} err={DescribeError(startArgs[1])}");
// Poll rows -> triggers GetNextEventQueryResultBuffer (result buffer CAPTURED).
MethodInfo moveNext = eventQueryType.GetMethods()
.First(m => m.Name == "MoveNext" && m.GetParameters().Length == 1);
PropertyInfo? queryResult = eventQueryType.GetProperty("QueryResult");
int rows = 0;
while (rows < maxEvents)
{
object?[] mnArgs = { Activator.CreateInstance(errorType) };
bool more;
try { more = (bool)moveNext.Invoke(eventQuery, mnArgs)!; }
catch (TargetInvocationException tie)
{
Console.WriteLine($"MoveNext threw: {tie.InnerException?.GetType().Name}: {tie.InnerException?.Message}");
break;
}
if (!more)
{
Console.WriteLine($"MoveNext: end after {rows} row(s) err={DescribeError(mnArgs[0])}");
break;
}
rows++;
if (rows <= 3 && queryResult != null)
{
// Print only the event TYPE + time (non-identity) to confirm rows flow.
object? res = queryResult.GetValue(eventQuery);
string typ = res?.GetType().GetProperty("Type")?.GetValue(res)?.ToString() ?? "?";
object? t = res?.GetType().GetProperty("EventTime")?.GetValue(res);
Console.WriteLine($" row {rows}: Type={typ} EventTime={t}");
}
}
Console.WriteLine($"Rows iterated: {rows}");
MethodInfo? endQuery = eventQueryType.GetMethods()
.FirstOrDefault(m => m.Name == "EndQuery" && m.GetParameters().Length == 1);
if (endQuery != null)
{
object?[] eqArgs = { Activator.CreateInstance(errorType) };
endQuery.Invoke(eventQuery, eqArgs);
}
Console.WriteLine(rows > 0 ? "CAPTURE-EVENT: PASS (rows flowed)" : "CAPTURE-EVENT: request captured (zero rows)");
return 0;
}
finally
{
try
{
MethodInfo? close = accessType.GetMethod("CloseConnection", new[] { errorType.MakeByRefType() });
if (close != null) close.Invoke(access, new object?[] { Activator.CreateInstance(errorType) });
}
catch { /* best-effort */ }
}
}
/// <summary>
/// Read-only gRPC connect probe: opens a 2023 R2 Historian (mode=Historian) connection via the
/// native client and reports the resulting connection status. Proves the mixed-mode client can
/// reach the live server over gRPC from this box — the foundation for the write-capture step.
/// Reads creds from HISTORIAN_USER / HISTORIAN_PASSWORD (explicit) or uses IntegratedSecurity.
/// Usage: connect --server WONDER-SQL-VD03 [--port 32565] [--cert WONDER-SQL-VD03] [--integrated]
/// Usage: connect --server <host> [--port 32565] [--cert <host>] [--integrated]
/// </summary>
private static int Connect(string managedDll, string[] args)
{
@@ -448,7 +622,7 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
Type certInfoType = Req(asm, "ArchestrA.CertificateInfo");
Type secModeType = Req(asm, "ArchestrA.HistorianSecurityMode");
string server = GetOption(args, "--server") ?? "WONDER-SQL-VD03";
string server = GetOption(args, "--server") ?? Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? "localhost";
int port = int.TryParse(GetOption(args, "--port"), out int p) ? p : 32565;
string certName = GetOption(args, "--cert") ?? server;
bool integrated = args.Contains("--integrated");