From dbb5c99c5303542ff7629b1c38e50c20d3dc2762 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 22 Jun 2026 10:41:15 -0400 Subject: [PATCH] feat(grpc-events): v6 StartEventQuery request + capture-event harness scenario MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Captured the stock 2023 R2 client doing a gRPC event read (50 rows flowed) to resolve the open "gRPC event ROW retrieval returns zero rows" item. Two captured differences from our SDK's path; this lands the first (necessary) one plus the capture tooling. - HistorianEventQueryProtocol.CreateStartEventQueryAttempts: add a `version` parameter (default 5 = the 2020 WCF format, unchanged). The gRPC event orchestrator now opts into version 6 — the leading `06` plus a 5-byte trailing zero pad — which is the envelope the stock 2023 R2 client sends. The two buffers are otherwise byte-identical (filter block, UTC string, metadata namespace). Golden test Version6EmptyFilterMatchesCapturedGrpcEnvelope pins it. - Grpc2023CaptureHarness: new `capture-event` scenario drives HistorianAccess over an Event-type gRPC connection (CreateEventQuery -> EventQueryArgs -> StartQuery -> MoveNext) so the wide-net instrument-grpc-nonstream rewrite dumps StartEventQuery.requestBuffer + the row result. Hostname defaults sanitized to HISTORIAN_GRPC_HOST / "localhost" (removed hardcoded server name). NECESSARY BUT NOT SUFFICIENT: live validation shows v6 alone does not make rows flow — the read also requires an Event-type connection, which our SDK's v6 Open2 format cannot express (see the companion docs commit). The gated ReadEventsAsync_OverGrpc_* test correctly still pins the no-row throw. 322/322 offline tests pass; WCF event path unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC --- .../Grpc/HistorianGrpcEventOrchestrator.cs | 8 +- .../Wcf/HistorianEventQueryProtocol.cs | 30 ++- .../WcfEventQueryProtocolTests.cs | 27 +++ .../Program.cs | 186 +++++++++++++++++- 4 files changed, 238 insertions(+), 13 deletions(-) diff --git a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventOrchestrator.cs b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventOrchestrator.cs index 18d1b0f..ff0d62f 100644 --- a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventOrchestrator.cs +++ b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventOrchestrator.cs @@ -266,11 +266,17 @@ internal sealed class HistorianGrpcEventOrchestrator new GrpcRetrieval.GetRetrievalInterfaceVersionRequest(), connection.Metadata, Deadline(), cancellationToken); HistorianServerVersionGate.Validate(HistorianServiceInterface.Retrieval, retrievalVersion.UiVersion, _options); + // Version 6 envelope: the stock 2023 R2 client sends v6 (the WCF path's v5 request is accepted + // here but is the legacy format). NECESSARY but not alone sufficient — live validation 2026-06-22 + // showed rows still don't flow on v6 because the read also requires an EVENT-type connection + // (the stock client opens ConnectionType=Event; our OpenSession opens a Process-style 0x402 + // session). See docs/reverse-engineering/grpc-event-query-capture.md "remaining gate". IReadOnlyList attempts = HistorianEventQueryProtocol.CreateStartEventQueryAttempts( startUtc.ToUniversalTime(), endUtc.ToUniversalTime(), eventCount: 5, - filter); + filter, + version: 6); byte[] requestBuffer = attempts[0].RequestBuffer; GrpcRetrieval.StartEventQueryResponse startResponse = retrievalClient.StartEventQuery( diff --git a/src/AVEVA.Historian.Client/Wcf/HistorianEventQueryProtocol.cs b/src/AVEVA.Historian.Client/Wcf/HistorianEventQueryProtocol.cs index 609fcda..8733d80 100644 --- a/src/AVEVA.Historian.Client/Wcf/HistorianEventQueryProtocol.cs +++ b/src/AVEVA.Historian.Client/Wcf/HistorianEventQueryProtocol.cs @@ -8,22 +8,31 @@ internal static class HistorianEventQueryProtocol { public const ushort QueryRequestTypeEvent = 3; + /// + /// Builds the StartEventQuery pRequestBuff. selects the + /// envelope revision: 5 (default) is the native 2020 WCF format used by the WCF event + /// orchestrator; 6 is the 2023 R2 gRPC format. The two envelopes are byte-identical except + /// the leading version word and a 5-byte trailing zero pad — captured 2026-06-22 from the stock + /// 2023 R2 client (see docs/reverse-engineering/grpc-event-query-capture.md). The 2023 R2 + /// server returns rows only for v6; v5 is accepted (StartEventQuery succeeds) but matches no rows. + /// The filter block in the middle is unchanged across versions. + /// public static IReadOnlyList CreateStartEventQueryAttempts( - DateTime startUtc, DateTime endUtc, uint eventCount, HistorianEventFilter? filter = null) + DateTime startUtc, DateTime endUtc, uint eventCount, HistorianEventFilter? filter = null, ushort version = 5) { List attempts = []; - attempts.Add(CreateNativeFilterAttempt(startUtc, endUtc, eventCount, filter)); + attempts.Add(CreateNativeFilterAttempt(startUtc, endUtc, eventCount, filter, version)); return attempts; } private static HistorianEventQueryAttempt CreateNativeFilterAttempt( - DateTime startUtc, DateTime endUtc, uint eventCount, HistorianEventFilter? filter) + DateTime startUtc, DateTime endUtc, uint eventCount, HistorianEventFilter? filter, ushort version) { using MemoryStream stream = new(); using BinaryWriter writer = new(stream, Encoding.Unicode, leaveOpen: true); - writer.Write((ushort)5); + writer.Write(version); writer.Write(startUtc.ToFileTimeUtc()); writer.Write(endUtc.ToFileTimeUtc()); writer.Write(eventCount); @@ -43,10 +52,19 @@ internal static class HistorianEventQueryProtocol WriteMetadataNamespace(writer); writer.Write(0u); + // Version 6 (2023 R2 gRPC) appends a 5-byte trailing zero pad after the v5 terminal — the only + // envelope delta from v5 besides the version word. Captured live: the v6 buffer is the v5 buffer + // (byte 0 = 6) plus these 5 bytes, and is the form the 2023 R2 server returns event rows for. + if (version >= 6) + { + writer.Write(0u); + writer.Write((byte)0); + } + byte[] request = stream.ToArray(); return new HistorianEventQueryAttempt( - filter is null ? "native-empty-filter-version5" : "native-filter-version5", - 5, + filter is null ? $"native-empty-filter-version{version}" : $"native-filter-version{version}", + version, request, Convert.ToHexString(SHA256.HashData(request)).ToLowerInvariant()); } diff --git a/tests/AVEVA.Historian.Client.Tests/WcfEventQueryProtocolTests.cs b/tests/AVEVA.Historian.Client.Tests/WcfEventQueryProtocolTests.cs index 5d76f8e..eab9fda 100644 --- a/tests/AVEVA.Historian.Client.Tests/WcfEventQueryProtocolTests.cs +++ b/tests/AVEVA.Historian.Client.Tests/WcfEventQueryProtocolTests.cs @@ -73,6 +73,33 @@ public sealed class WcfEventQueryProtocolTests Assert.Equal("6b955b02087047a3199a8c74f3eee85c3b49aaa29b05de12eff2dd536f2da0d5", attempt.RequestSha256); } + [Fact] + public void Version6EmptyFilterMatchesCapturedGrpcEnvelope() + { + // Captured 2026-06-22 from the stock 2023 R2 client (docs/reverse-engineering/grpc-event-query-capture.md): + // the v6 StartEventQuery request is byte-identical to the v5 buffer except byte 0 (version 6) and a + // 5-byte trailing zero pad (70 vs 65 bytes). The 2023 R2 server returns event rows only for v6. + DateTime start = new DateTime(2026, 4, 25, 14, 39, 36, 800, DateTimeKind.Utc).AddTicks(1646); + DateTime end = new DateTime(2026, 5, 2, 14, 39, 36, 800, DateTimeKind.Utc).AddTicks(1646); + + byte[] v5 = HistorianEventQueryProtocol.CreateStartEventQueryAttempts(start, end, 3)[0].RequestBuffer; + HistorianEventQueryAttempt v6Attempt = Assert.Single( + HistorianEventQueryProtocol.CreateStartEventQueryAttempts(start, end, 3, filter: null, version: 6)); + byte[] v6 = v6Attempt.RequestBuffer; + + Assert.Equal("native-empty-filter-version6", v6Attempt.Name); + Assert.Equal(6, v6Attempt.Version); + Assert.Equal(70, v6.Length); + Assert.Equal([0x06, 0x00], v6[..2]); + + // v6 == v5 with byte 0 -> 6 and 5 trailing zero bytes appended. + byte[] expected = new byte[70]; + Array.Copy(v5, expected, v5.Length); + expected[0] = 0x06; + Assert.Equal(expected, v6); + Assert.Equal([0x00, 0x00, 0x00, 0x00, 0x00], v6[^5..]); + } + [Fact] public void NativeEmptyFilterAttemptMatchesDecompiledSaveOrder() { diff --git a/tools/AVEVA.Historian.Grpc2023CaptureHarness/Program.cs b/tools/AVEVA.Historian.Grpc2023CaptureHarness/Program.cs index 240cfe4..323c465 100644 --- a/tools/AVEVA.Historian.Grpc2023CaptureHarness/Program.cs +++ b/tools/AVEVA.Historian.Grpc2023CaptureHarness/Program.cs @@ -90,8 +90,10 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness return CaptureWrite(managedDll, args); case "delete-tag": return DeleteTag(managedDll, args); + case "capture-event": + return CaptureEvent(managedDll, args); default: - Console.Error.WriteLine($"Unknown scenario '{scenario}'. Supported: load-check, connect, capture-write, delete-tag."); + Console.Error.WriteLine($"Unknown scenario '{scenario}'. Supported: load-check, connect, capture-write, delete-tag, capture-event."); return 1; } } @@ -114,7 +116,7 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness Type tagStatusType = Req(asm, "ArchestrA.HistorianTagStatus"); Type tagStatusListType = Req(asm, "ArchestrA.HistorianTagStatusList"); - string server = GetOption(args, "--server") ?? "WONDER-SQL-VD03"; + string server = GetOption(args, "--server") ?? Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? "localhost"; int port = int.TryParse(GetOption(args, "--port"), out int p) ? p : 32565; string certName = GetOption(args, "--cert") ?? server; string tagName = GetOption(args, "--tag") ?? "SdkM3CaptureSandbox"; @@ -201,7 +203,7 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness /// SendValues (the actual wire push) only runs with --commit. Run with --grpc-rewrite pointing /// at the instrumented copy and AVEVA_HISTORIAN_RE_CAPTURE set to the output file. /// Usage: capture-write --tag SdkM3CaptureSandbox [--create] [--commit] - /// [--server WONDER-SQL-VD03] [--port 32565] [--cert WONDER-SQL-VD03] [--value 123.0] + /// [--server ] [--port 32565] [--cert ] [--value 123.0] /// private static int CaptureWrite(string managedDll, string[] args) { @@ -221,7 +223,7 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness Type listType = Req(asm, "ArchestrA.HistorianDataValueList"); Type categoryEnum = Req(asm, "ArchestrA.HistorianDataCategory"); - string server = GetOption(args, "--server") ?? "WONDER-SQL-VD03"; + string server = GetOption(args, "--server") ?? Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? "localhost"; int port = int.TryParse(GetOption(args, "--port"), out int p) ? p : 32565; string certName = GetOption(args, "--cert") ?? server; string tagName = GetOption(args, "--tag") ?? "SdkM3CaptureSandbox"; @@ -429,12 +431,184 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness result = m.Invoke(target, a); } + /// + /// Drives the native 2023 R2 client through a read-only gRPC EVENT query so the IL-rewritten + /// GrpcRetrievalClient dumps the uncaptured event buffers: StartEventQuery.requestBuffer (the + /// empty-filter request shape our SDK's CreateNativeEmptyFilterAttempt is being compared against) + /// and GetNextEventQueryResultBuffer.result (the row buffer — proves rows flow when driven right). + /// + /// CRITICAL: the connection is opened with ConnectionType=Event (NOT Process). CreateEventQuery() + /// returns null unless IsEventConnectionRequested() — the native event read runs on ConnectionIndex 1, + /// a separate connection from the process/data path. This is the prime suspect for why the SDK's + /// gRPC empty-filter query returns zero rows despite the server holding events. + /// + /// Sequence: OpenConnection(Event, read-only, gRPC) -> CreateEventQuery() -> + /// EventQueryArgs{StartDateTime,EndDateTime,EventCount} -> EventQuery.StartQuery(args) -> + /// loop EventQuery.MoveNext()/QueryResult -> EventQuery.EndQuery() -> CloseConnection. + /// Run with --grpc-rewrite pointing at the instrumented Archestra.Historian.GrpcClient.dll and + /// AVEVA_HISTORIAN_RE_CAPTURE set to the output NDJSON. Read-only — non-destructive. + /// Usage: capture-event [--server ] [--port 32565] [--cert ] + /// [--lookback-hours 720] [--max-events 50] [--integrated] + /// + private static int CaptureEvent(string managedDll, string[] args) + { + Assembly asm = Assembly.LoadFrom(managedDll); + Type accessType = Req(asm, "ArchestrA.HistorianAccess"); + Type connArgsType = Req(asm, "ArchestrA.HistorianConnectionArgs"); + Type connModeType = Req(asm, "ArchestrA.HistorianConnectionMode"); + Type connTypeType = Req(asm, "ArchestrA.HistorianConnectionType"); + Type errorType = Req(asm, "ArchestrA.HistorianAccessError"); + Type statusType = Req(asm, "ArchestrA.HistorianConnectionStatus"); + Type certInfoType = Req(asm, "ArchestrA.CertificateInfo"); + Type secModeType = Req(asm, "ArchestrA.HistorianSecurityMode"); + Type eventQueryType = Req(asm, "ArchestrA.EventQuery"); + Type eventArgsType = Req(asm, "ArchestrA.EventQueryArgs"); + + string server = GetOption(args, "--server") ?? Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? "localhost"; + int port = int.TryParse(GetOption(args, "--port"), out int p) ? p : 32565; + string certName = GetOption(args, "--cert") ?? server; + int lookbackHours = int.TryParse(GetOption(args, "--lookback-hours"), out int lh) ? lh : 720; + int maxEvents = int.TryParse(GetOption(args, "--max-events"), out int me) ? me : 50; + bool integrated = args.Contains("--integrated"); + string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER"); + string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD"); + if (!integrated && string.IsNullOrEmpty(user)) + { + Console.Error.WriteLine("Set HISTORIAN_USER/HISTORIAN_PASSWORD or pass --integrated."); + return 1; + } + + object connArgs = Activator.CreateInstance(connArgsType)!; + SetProp(connArgs, "ServerName", server); + SetProp(connArgs, "TcpPort", checked((ushort)port)); + SetProp(connArgs, "ConnectionMode", Enum.Parse(connModeType, "Historian")); // 2 = gRPC + SetProp(connArgs, "ConnectionType", Enum.Parse(connTypeType, "Event")); // EVENT connection + SetProp(connArgs, "ReadOnly", true); + SetProp(connArgs, "IntegratedSecurity", integrated); + SetProp(connArgs, "AllowUnTrustedConnection", true); + if (!integrated) + { + SetProp(connArgs, "UserName", user!); + SetProp(connArgs, "Password", password ?? string.Empty); + } + object certInfo = Activator.CreateInstance(certInfoType)!; + TrySetProp(certInfo, "CertificateName", certName); + TrySetProp(certInfo, "SecurityMode", Enum.Parse(secModeType, "TransportCertificate")); + TrySetProp(connArgs, "SecurityInfo", certInfo); + + object access = Activator.CreateInstance(accessType)!; + object error = Activator.CreateInstance(errorType)!; + object?[] openArgs = { connArgs, error }; + Console.WriteLine($"OpenConnection: server={server} port={port} mode=Historian type=Event cert={certName} integrated={integrated} readonly=true"); + bool opened; + try + { + opened = (bool)accessType.GetMethod("OpenConnection", new[] { connArgsType, errorType.MakeByRefType() })! + .Invoke(access, openArgs)!; + } + catch (TargetInvocationException tie) + { + Console.Error.WriteLine($"OpenConnection threw: {tie.InnerException?.GetType().Name}: {tie.InnerException?.Message}"); + return 2; + } + Console.WriteLine($"OpenConnection returned: {opened} err={DescribeError(openArgs[1])}"); + if (!opened) { return 2; } + + try + { + // Let the event connection (ConnectionIndex 1) come up. + MethodInfo getStatus = accessType.GetMethod("GetConnectionStatus", new[] { statusType.MakeByRefType() }) + ?? accessType.GetMethods().First(m => m.Name == "GetConnectionStatus" && m.GetParameters().Length == 1); + for (int i = 0; i < 10; i++) + { + object?[] sArgs = { null }; + getStatus.Invoke(access, sArgs); + if (ReadBoolProp(sArgs[0], "ConnectedToServer") || !ReadBoolProp(sArgs[0], "Pending")) break; + System.Threading.Thread.Sleep(500); + } + + // CreateEventQuery() is non-null only when the connection is an event connection. + MethodInfo createEventQuery = accessType.GetMethod("CreateEventQuery", Type.EmptyTypes) + ?? accessType.GetMethods().First(m => m.Name == "CreateEventQuery" && m.GetParameters().Length == 0); + object? eventQuery = createEventQuery.Invoke(access, null); + Console.WriteLine($"CreateEventQuery: {(eventQuery == null ? "NULL (event connection not established!)" : "ok")}"); + if (eventQuery == null) { return 3; } + + // Build EventQueryArgs over the populated window. Times in UTC. + object eventArgs = Activator.CreateInstance(eventArgsType)!; + DateTime endUtc = DateTime.UtcNow; + DateTime startUtc = endUtc.AddHours(-lookbackHours); + TrySetProp(eventArgs, "StartDateTime", DateTime.SpecifyKind(startUtc, DateTimeKind.Utc)); + TrySetProp(eventArgs, "EndDateTime", DateTime.SpecifyKind(endUtc, DateTimeKind.Utc)); + TrySetProp(eventArgs, "EventCount", checked((uint)maxEvents)); + Console.WriteLine($"EventQueryArgs: start={startUtc:o} end={endUtc:o} eventCount={maxEvents}"); + + // StartQuery -> triggers GrpcRetrievalClient.StartEventQuery (requestBuffer CAPTURED). + MethodInfo startQuery = eventQueryType.GetMethods() + .First(m => m.Name == "StartQuery" && m.GetParameters().Length == 2); + object?[] startArgs = { eventArgs, Activator.CreateInstance(errorType) }; + bool started = (bool)startQuery.Invoke(eventQuery, startArgs)!; + Console.WriteLine($"StartQuery: {started} err={DescribeError(startArgs[1])}"); + + // Poll rows -> triggers GetNextEventQueryResultBuffer (result buffer CAPTURED). + MethodInfo moveNext = eventQueryType.GetMethods() + .First(m => m.Name == "MoveNext" && m.GetParameters().Length == 1); + PropertyInfo? queryResult = eventQueryType.GetProperty("QueryResult"); + int rows = 0; + while (rows < maxEvents) + { + object?[] mnArgs = { Activator.CreateInstance(errorType) }; + bool more; + try { more = (bool)moveNext.Invoke(eventQuery, mnArgs)!; } + catch (TargetInvocationException tie) + { + Console.WriteLine($"MoveNext threw: {tie.InnerException?.GetType().Name}: {tie.InnerException?.Message}"); + break; + } + if (!more) + { + Console.WriteLine($"MoveNext: end after {rows} row(s) err={DescribeError(mnArgs[0])}"); + break; + } + rows++; + if (rows <= 3 && queryResult != null) + { + // Print only the event TYPE + time (non-identity) to confirm rows flow. + object? res = queryResult.GetValue(eventQuery); + string typ = res?.GetType().GetProperty("Type")?.GetValue(res)?.ToString() ?? "?"; + object? t = res?.GetType().GetProperty("EventTime")?.GetValue(res); + Console.WriteLine($" row {rows}: Type={typ} EventTime={t}"); + } + } + Console.WriteLine($"Rows iterated: {rows}"); + + MethodInfo? endQuery = eventQueryType.GetMethods() + .FirstOrDefault(m => m.Name == "EndQuery" && m.GetParameters().Length == 1); + if (endQuery != null) + { + object?[] eqArgs = { Activator.CreateInstance(errorType) }; + endQuery.Invoke(eventQuery, eqArgs); + } + Console.WriteLine(rows > 0 ? "CAPTURE-EVENT: PASS (rows flowed)" : "CAPTURE-EVENT: request captured (zero rows)"); + return 0; + } + finally + { + try + { + MethodInfo? close = accessType.GetMethod("CloseConnection", new[] { errorType.MakeByRefType() }); + if (close != null) close.Invoke(access, new object?[] { Activator.CreateInstance(errorType) }); + } + catch { /* best-effort */ } + } + } + /// /// Read-only gRPC connect probe: opens a 2023 R2 Historian (mode=Historian) connection via the /// native client and reports the resulting connection status. Proves the mixed-mode client can /// reach the live server over gRPC from this box — the foundation for the write-capture step. /// Reads creds from HISTORIAN_USER / HISTORIAN_PASSWORD (explicit) or uses IntegratedSecurity. - /// Usage: connect --server WONDER-SQL-VD03 [--port 32565] [--cert WONDER-SQL-VD03] [--integrated] + /// Usage: connect --server [--port 32565] [--cert ] [--integrated] /// private static int Connect(string managedDll, string[] args) { @@ -448,7 +622,7 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness Type certInfoType = Req(asm, "ArchestrA.CertificateInfo"); Type secModeType = Req(asm, "ArchestrA.HistorianSecurityMode"); - string server = GetOption(args, "--server") ?? "WONDER-SQL-VD03"; + string server = GetOption(args, "--server") ?? Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? "localhost"; int port = int.TryParse(GetOption(args, "--port"), out int p) ? p : 32565; string certName = GetOption(args, "--cert") ?? server; bool integrated = args.Contains("--integrated");