feat(grpc): route ReadEvents over gRPC + extract shared CM_EVENT registration
Adds HistorianGrpcEventOrchestrator: opens a read-only gRPC session, replays the CM_EVENT registration (UpdateClientStatus -> 6 GetSystemParameter -> RegisterTags -> cross-service version probes -> EnsureTags), then StartEventQuery -> loop GetNextEventQueryResultBuffer -> EndEventQuery, reusing the WCF query builder and row parser verbatim. Routed in Historian2020ProtocolDialect on UseGrpc. The captured registration buffers (CmEventTagId, UpdC3 blob, RTag2 buffer, GETHI builder, pre-register param list, native-error decode) are extracted into a shared HistorianEventRegistrationProtocol so the WCF and gRPC paths can't drift; the WCF orchestrator is refactored onto it with no behavior change. Live finding (2026-06-22): the chain runs and StartEventQuery succeeds, but the gRPC server long-polls GetNextEventQueryResultBuffer on no data (it blocks to the deadline instead of returning the synchronous 5-byte code-85 terminal the WCF op returns). Per-call gRPC-Web deadlines proved unreliable over a tunnel, so the read is hard-bounded by an overall linked-CTS budget (<=30s; gRPC honors token cancellation). On the no-row path it throws ProtocolEvidenceMissingException rather than assert a false-empty list. Row-level retrieval awaits an event-bearing 2023 R2 server (the dev box holds no events). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
@@ -0,0 +1,102 @@
|
||||
using System.Buffers.Binary;
|
||||
using System.Text;
|
||||
|
||||
namespace AVEVA.Historian.Client.Wcf;
|
||||
|
||||
/// <summary>
|
||||
/// Captured byte buffers for the native CM_EVENT registration sequence that both the WCF and gRPC
|
||||
/// event orchestrators replay before <c>StartEventQuery</c>. Extracted to a single source of truth so
|
||||
/// the two transports cannot drift on these reverse-engineered constants. The bytes are captured
|
||||
/// byte-for-byte from a successful native event read via the instrument-wcf-{write,read}message
|
||||
/// IL-rewrite tool (see <see cref="HistorianWcfEventOrchestrator"/> remarks for record references).
|
||||
/// </summary>
|
||||
internal static class HistorianEventRegistrationProtocol
|
||||
{
|
||||
/// <summary>
|
||||
/// Documented native CM_EVENT default tag id used by aahClientManaged.dll
|
||||
/// CreateDefaultEventTag → ConvertEventTagToTagMetadata. Registering this tag (RegisterTags2 /
|
||||
/// HistoryService.RegisterTags) before StartEventQuery subscribes the session to CM_EVENT
|
||||
/// events; without it, GetNextEventQueryResultBuffer returns native error type=4 code=85 (0x55).
|
||||
/// </summary>
|
||||
public static readonly Guid CmEventTagId = new("353b8145-5df0-4d46-a253-871aef49b321");
|
||||
|
||||
/// <summary>
|
||||
/// The 6 system-parameter names the native client queries (records 11-16) between UpdC3 and
|
||||
/// RTag2. They appear informational, but are replayed to put the server-side session into the
|
||||
/// state EnsT2 expects.
|
||||
/// </summary>
|
||||
public static readonly string[] StatusParametersBeforeRegister =
|
||||
[
|
||||
"AllowOriginals",
|
||||
"HistorianPartner",
|
||||
"HistorianVersion",
|
||||
"MaxCyclicStorageTimeout",
|
||||
"RealTimeWindow",
|
||||
"FutureTimeThreshold",
|
||||
];
|
||||
|
||||
/// <summary>
|
||||
/// Native GETHI pRequestBuff layout for a parameter-name query: 8-byte header
|
||||
/// (UInt16 0x6753 + UInt16 0x0002 + UInt32 nameLength) + UTF-16 LE chars (no trailing null byte —
|
||||
/// observed truncated by 1 byte vs full UTF-16 in the captured native bytes). Layout taken from
|
||||
/// writemessage-capture-event-latest.ndjson record 8.
|
||||
/// </summary>
|
||||
public static byte[] BuildGetHistorianInfoRequest(string parameterName)
|
||||
{
|
||||
byte[] nameBytes = Encoding.Unicode.GetBytes(parameterName);
|
||||
// Native truncates the trailing high byte of the last UTF-16 char.
|
||||
int payloadLength = nameBytes.Length > 0 ? nameBytes.Length - 1 : 0;
|
||||
byte[] buffer = new byte[8 + payloadLength];
|
||||
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(0, 2), 0x6753);
|
||||
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(2, 2), 0x0002);
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(4, 4), (uint)parameterName.Length);
|
||||
Buffer.BlockCopy(nameBytes, 0, buffer, 8, payloadLength);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 81-byte UpdC3 clientStatus blob captured from a native event read (record 10 of
|
||||
/// writemessage-capture-event-latest.ndjson). Layout: 0x02 0x01 + 76 zero bytes +
|
||||
/// uint32(0x0000001E). The trailing 30 is likely an interval / timeout in seconds; all other
|
||||
/// observed fields are zero for a fresh session.
|
||||
/// </summary>
|
||||
public static byte[] BuildUpdateClientStatusBlob()
|
||||
{
|
||||
byte[] blob = new byte[81];
|
||||
blob[0] = 0x02;
|
||||
blob[1] = 0x01;
|
||||
blob[77] = 0x1E;
|
||||
return blob;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 24-byte RTag2 pInBuff captured from a native event read (record 17). Layout: 8-byte header
|
||||
/// (0x50 0x67 0x02 0x00 + uint32 element count = 1) + 16-byte tag id GUID.
|
||||
/// </summary>
|
||||
public static byte[] BuildRegisterCmEventInputBuffer()
|
||||
{
|
||||
byte[] buffer = new byte[24];
|
||||
buffer[0] = 0x50;
|
||||
buffer[1] = 0x67;
|
||||
buffer[2] = 0x02;
|
||||
buffer[3] = 0x00;
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(4, 4), 1u);
|
||||
CmEventTagId.ToByteArray().CopyTo(buffer.AsSpan(8, 16));
|
||||
return buffer;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Describes a native 5-byte error/terminal buffer: byte0 = type, bytes 1-4 = LE uint32 code.
|
||||
/// </summary>
|
||||
public static string DescribeNativeError(byte[] errorBuffer)
|
||||
{
|
||||
if (errorBuffer.Length < 5)
|
||||
{
|
||||
return "<short>";
|
||||
}
|
||||
|
||||
byte type = errorBuffer[0];
|
||||
uint code = BinaryPrimitives.ReadUInt32LittleEndian(errorBuffer.AsSpan(1, 4));
|
||||
return $"type={type} code={code} (0x{code:X})";
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
using System.Buffers.Binary;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Runtime.Versioning;
|
||||
using System.ServiceModel;
|
||||
@@ -29,15 +28,6 @@ internal sealed class HistorianWcfEventOrchestrator
|
||||
private const uint NativeClientVersionInt = 999_999;
|
||||
private const ushort NativeOpen2ClientVersion = 9;
|
||||
|
||||
/// <summary>
|
||||
/// Documented native CM_EVENT default tag id used by aahClientManaged.dll
|
||||
/// CreateDefaultEventTag → ConvertEventTagToTagMetadata. Registering this tag via
|
||||
/// IHistoryServiceContract2.RegisterTags2 before StartEventQuery causes the server
|
||||
/// to subscribe the session to CM_EVENT events; without it,
|
||||
/// GetNextEventQueryResultBuffer returns native error type=4 code=85 (0x55).
|
||||
/// </summary>
|
||||
private static readonly Guid CmEventTagId = new("353b8145-5df0-4d46-a253-871aef49b321");
|
||||
|
||||
private readonly HistorianClientOptions _options;
|
||||
|
||||
public HistorianWcfEventOrchestrator(HistorianClientOptions options)
|
||||
@@ -333,11 +323,11 @@ internal sealed class HistorianWcfEventOrchestrator
|
||||
TryRun(() => statusChannel.GetInterfaceVersion(out _));
|
||||
TryRun(() => statusChannel.GetInterfaceVersion(out _));
|
||||
|
||||
byte[] historianVersionRequest = BuildGetHistorianInfoRequest("HistorianVersion");
|
||||
byte[] historianVersionRequest = HistorianEventRegistrationProtocol.BuildGetHistorianInfoRequest("HistorianVersion");
|
||||
TryRun(() => statusChannel.GetHistorianInfo(handle, historianVersionRequest, out _, out _));
|
||||
TryRun(() => statusChannel.GetHistorianInfo(handle, historianVersionRequest, out _, out _));
|
||||
|
||||
byte[] clientStatus = BuildUpdC3ClientStatusBlob();
|
||||
byte[] clientStatus = HistorianEventRegistrationProtocol.BuildUpdateClientStatusBlob();
|
||||
bool updSuccess = historyChannel.UpdateClientStatus3(
|
||||
handle: handle,
|
||||
clientStatusSize: (uint)clientStatus.Length,
|
||||
@@ -349,12 +339,12 @@ internal sealed class HistorianWcfEventOrchestrator
|
||||
LastUpdC3ReturnCode = updSuccess ? 0u : 1u;
|
||||
|
||||
// Records 11-16: 6 system-parameter queries before RTag2.
|
||||
foreach (string parameterName in NativeStatusParametersBeforeRTag2)
|
||||
foreach (string parameterName in HistorianEventRegistrationProtocol.StatusParametersBeforeRegister)
|
||||
{
|
||||
TryRun(() => statusChannel.GetSystemParameter(context.ClientHandle, parameterName, out _, out _, out _));
|
||||
}
|
||||
|
||||
byte[] registerBuffer = BuildRTag2CmEventInputBuffer();
|
||||
byte[] registerBuffer = HistorianEventRegistrationProtocol.BuildRegisterCmEventInputBuffer();
|
||||
bool registerSuccess = historyChannel.RegisterTags2(
|
||||
handle: handle,
|
||||
elementCount: 1,
|
||||
@@ -407,84 +397,14 @@ internal sealed class HistorianWcfEventOrchestrator
|
||||
}
|
||||
}
|
||||
|
||||
private static readonly string[] NativeStatusParametersBeforeRTag2 =
|
||||
[
|
||||
"AllowOriginals",
|
||||
"HistorianPartner",
|
||||
"HistorianVersion",
|
||||
"MaxCyclicStorageTimeout",
|
||||
"RealTimeWindow",
|
||||
"FutureTimeThreshold",
|
||||
];
|
||||
|
||||
private static void TryRun(Action action)
|
||||
{
|
||||
try { action(); }
|
||||
catch { }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Native GETHI pRequestBuff layout for a parameter-name query: 8-byte header
|
||||
/// (UInt16 0x6753 + UInt16 0x0002 + UInt32 nameLength) + UTF-16 LE chars (no
|
||||
/// trailing null byte — observed truncated by 1 byte vs full UTF-16 in the
|
||||
/// captured native bytes). Layout taken from
|
||||
/// writemessage-capture-event-latest.ndjson record 8.
|
||||
/// </summary>
|
||||
private static byte[] BuildGetHistorianInfoRequest(string parameterName)
|
||||
{
|
||||
byte[] nameBytes = System.Text.Encoding.Unicode.GetBytes(parameterName);
|
||||
// Native truncates the trailing high byte of the last UTF-16 char.
|
||||
int payloadLength = nameBytes.Length > 0 ? nameBytes.Length - 1 : 0;
|
||||
byte[] buffer = new byte[8 + payloadLength];
|
||||
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(0, 2), 0x6753);
|
||||
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(2, 2), 0x0002);
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(4, 4), (uint)parameterName.Length);
|
||||
Buffer.BlockCopy(nameBytes, 0, buffer, 8, payloadLength);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 81-byte UpdC3 clientStatus blob captured from a native event read (record 10 of
|
||||
/// writemessage-capture-event-latest.ndjson). Layout: 0x02 0x01 + 76 zero bytes +
|
||||
/// uint32(0x0000001E). The trailing 30 is likely an interval / timeout in seconds; all
|
||||
/// other observed fields are zero for a fresh session.
|
||||
/// </summary>
|
||||
private static byte[] BuildUpdC3ClientStatusBlob()
|
||||
{
|
||||
byte[] blob = new byte[81];
|
||||
blob[0] = 0x02;
|
||||
blob[1] = 0x01;
|
||||
blob[77] = 0x1E;
|
||||
return blob;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 24-byte RTag2 pInBuff captured from a native event read (record 17). Layout:
|
||||
/// 8-byte header (0x50 0x67 0x02 0x00 + uint32 element count = 1) + 16-byte tag id GUID.
|
||||
/// </summary>
|
||||
private static byte[] BuildRTag2CmEventInputBuffer()
|
||||
{
|
||||
byte[] buffer = new byte[24];
|
||||
buffer[0] = 0x50;
|
||||
buffer[1] = 0x67;
|
||||
buffer[2] = 0x02;
|
||||
buffer[3] = 0x00;
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(4, 4), 1u);
|
||||
CmEventTagId.ToByteArray().CopyTo(buffer.AsSpan(8, 16));
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private static string DescribeNativeError(byte[] errorBuffer)
|
||||
{
|
||||
if (errorBuffer.Length < 5)
|
||||
{
|
||||
return "<short>";
|
||||
}
|
||||
|
||||
byte type = errorBuffer[0];
|
||||
uint code = BinaryPrimitives.ReadUInt32LittleEndian(errorBuffer.AsSpan(1, 4));
|
||||
return $"type={type} code={code} (0x{code:X})";
|
||||
}
|
||||
=> HistorianEventRegistrationProtocol.DescribeNativeError(errorBuffer);
|
||||
|
||||
private static void CloseChannelSafely(ICommunicationObject channel)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user