using System.Runtime.CompilerServices; using Google.Protobuf; using Grpc.Core; using AVEVA.Historian.Client.Models; using AVEVA.Historian.Client.Wcf; using GrpcHistory = ArchestrA.Grpc.Contract.History; using GrpcRetrieval = ArchestrA.Grpc.Contract.Retrieval; using GrpcStatus = ArchestrA.Grpc.Contract.Status; using GrpcTransaction = ArchestrA.Grpc.Contract.Transaction; namespace AVEVA.Historian.Client.Grpc; /// /// 2023 R2 gRPC event-read orchestrator. Mirrors over the /// gRPC transport: the same CM_EVENT registration sequence and the same event request/row buffers /// travel inside protobuf bytes fields, reusing the proven WCF serializers/parsers verbatim. /// /// Operation mapping (2020 WCF → 2023 R2 gRPC): /// Hist.UpdateClientStatus3 → HistoryService.UpdateClientStatus /// Hist.RegisterTags2 → HistoryService.RegisterTags /// Hist.EnsureTags2 → HistoryService.EnsureTags /// Stat.GetHistorianInfo → StatusService.GetHistorianInfo /// Stat.GetSystemParameter → StatusService.GetSystemParameter /// Retr.StartEventQuery → RetrievalService.StartEventQuery /// Retr.GetNextEventQueryResultBuffer (loop) → RetrievalService.GetNextEventQueryResultBuffer /// Retr.EndEventQuery → RetrievalService.EndEventQuery /// /// /// The CM_EVENT registration replay () is the hard part: without it /// the server returns native error type=4 code=85 from GetNextEventQueryResultBuffer. The captured /// registration buffers are shared with the WCF path via /// so the two transports cannot drift. The gRPC /// RetrievalService event ops do NOT need the WCF Retr.GetV/IsOriginalAllowed prime /// (the read path proved the front-door session is sufficient over gRPC). /// /// /// Live status (2026-06-22): the chain runs end-to-end and StartEventQuery succeeds, but /// GetNextEventQueryResultBuffer long-polls when the query has no rows — it blocks to the /// call deadline instead of returning the synchronous 5-byte code-85 terminal the 2020 WCF op returns. /// A poll-deadline expiry is therefore treated as the no-data terminal (see the loop). The idle dev box /// holds no events, so row-level retrieval is not yet live-verified; verifying parsed rows over /// gRPC awaits an event-bearing 2023 R2 server. This is tooled + completes cleanly, NOT proven to /// return rows. /// /// internal sealed class HistorianGrpcEventOrchestrator { private readonly HistorianClientOptions _options; public HistorianGrpcEventOrchestrator(HistorianClientOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); } /// Diagnostic: length of the most recent event-row result buffer the server sent. public int LastResultBufferLength { get; private set; } /// Diagnostic: type+code description of the most recent error/terminal buffer. public string LastErrorBufferDescription { get; private set; } = string.Empty; /// Diagnostic: which transport the event channel used (grpc-web or http2). public string EventChannelMode { get; private set; } = string.Empty; /// Diagnostic: hex of the most recent result buffer (first 48 bytes). public string LastResultBufferHex { get; private set; } = string.Empty; /// Diagnostic: hex of the most recent GetNext error buffer. public string LastErrorBufferHex { get; private set; } = string.Empty; public async IAsyncEnumerable ReadEventsAsync( DateTime startUtc, DateTime endUtc, HistorianEventFilter? filter, [EnumeratorCancellation] CancellationToken cancellationToken) { if (!_options.IntegratedSecurity && string.IsNullOrEmpty(_options.UserName)) { throw new ProtocolEvidenceMissingException( "Managed gRPC event flow currently requires IntegratedSecurity or an explicit UserName + Password."); } cancellationToken.ThrowIfCancellationRequested(); // Hard overall cap. The per-call gRPC-Web deadlines are NOT honored reliably over a tunnelled // link (observed live 2026-06-22: a chain with 4s per-call deadlines still ran >90s because the // server stalls several registration RPCs and long-polls GetNext). gRPC DOES honor token // cancellation, so a linked CTS firing at OverallBudget bounds the whole read deterministically. // A budget timeout on the unverified no-row path is surfaced as ProtocolEvidenceMissing, not as // a raw cancellation, so callers get the same honest "not row-verified over gRPC" signal. using CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); linked.CancelAfter(OverallBudget); IReadOnlyList events; try { events = await Task.Run( () => RunEventChain(startUtc, endUtc, filter, linked.Token), linked.Token).ConfigureAwait(false); } catch (Exception ex) when (IsBudgetCancellation(ex, linked, cancellationToken)) { throw new ProtocolEvidenceMissingException( $"ReadEvents over gRPC did not return rows within {OverallBudget.TotalSeconds:0}s: StartEventQuery " + "succeeds but the CM_EVENT registration replay stalls and GetNextEventQueryResultBuffer long-polls " + "(no synchronous code-85 terminal over gRPC). Row-level retrieval is not yet verified over gRPC " + "(the dev box holds no events) — use the WCF transport for event reads."); } foreach (HistorianEvent evt in events) { cancellationToken.ThrowIfCancellationRequested(); yield return evt; } } /// /// Hard wall-clock budget for the entire gRPC event read. Bounds the chain deterministically since /// per-call gRPC-Web deadlines are unreliable over a tunnel. Scaled off the request timeout but /// capped so a long default timeout cannot make the (currently row-unverified) read stall for minutes. /// private TimeSpan OverallBudget { get { TimeSpan cap = TimeSpan.FromSeconds(30); return _options.RequestTimeout < cap ? _options.RequestTimeout : cap; } } /// /// True when an exception was caused by the overall-budget linked CTS firing (not by the caller's /// own cancellation). The budget surfaces either as an /// (Task.Run / token checks) or a gRPC with /// from an in-flight RPC. /// private static bool IsBudgetCancellation(Exception ex, CancellationTokenSource linked, CancellationToken caller) { if (caller.IsCancellationRequested || !linked.IsCancellationRequested) { return false; } return ex is OperationCanceledException || (ex is RpcException rpc && rpc.StatusCode is StatusCode.Cancelled or StatusCode.DeadlineExceeded); } private List RunEventChain(DateTime startUtc, DateTime endUtc, HistorianEventFilter? filter, CancellationToken cancellationToken) { // Hypothesis #1 (server-side/connection angle, grpc-event-query-capture.md): the native client // uses Grpc.Core native HTTP/2, while our default channel wraps gRPC-Web over HTTP/1.1. Reads // work over gRPC-Web, but the connection-scoped event query may require a true HTTP/2 connection. // Opt in via HISTORIAN_GRPC_EVENT_HTTP2=1 to use a plain HTTP/2 channel for the event path only. bool useHttp2 = string.Equals( Environment.GetEnvironmentVariable("HISTORIAN_GRPC_EVENT_HTTP2"), "1", StringComparison.Ordinal); EventChannelMode = useHttp2 ? "http2" : "grpc-web"; using HistorianGrpcConnection connection = useHttp2 ? HistorianGrpcChannelFactory.CreateHttp2(_options) : HistorianGrpcChannelFactory.Create(_options); // Event reads need an Event-type (v8) connection. OpenSession(eventConnection: true) runs the // full v8 path: HistoryService.ExchangeKey (P-256 ECDH) -> client key = SHA256(secret) -> v8 // OpenConnection with ConnectionType=Event and the credential token RC4(password, MD5(clientKey)). HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, _options, cancellationToken, eventConnection: true); RegisterCmEventTag(connection, session, cancellationToken); List events = RunEventQuery(connection, session, startUtc, endUtc, filter, cancellationToken); // Honest no-data handling: when the query returns real rows, hand them back. When it instead // reaches the no-data terminal with ZERO rows (the gRPC server long-polls GetNext rather than // returning the WCF code-85 terminal), we cannot distinguish "genuinely no events in range" // from "the CM_EVENT registration replay didn't fully land over gRPC" — so we refuse to return // a possibly-false empty list and surface the unverified state instead. An event-bearing 2023 R2 // server will return rows here and exercise the parse path; flip this once that is captured. if (events.Count == 0) { throw new ProtocolEvidenceMissingException( "ReadEvents over gRPC: the chain completes and StartEventQuery succeeds, but " + "GetNextEventQueryResultBuffer returns no rows (it long-polls to the no-data terminal " + $"after the CM_EVENT registration replay; last={LastErrorBufferDescription}). Row-level " + "retrieval is not yet verified over gRPC (the dev box holds no events) — use the WCF " + "transport for event reads until a capture against an event-bearing 2023 R2 server confirms it."); } return events; } private DateTime Deadline() => DateTime.UtcNow.Add(_options.RequestTimeout); /// /// Deadline for the GetNextEventQueryResultBuffer long-poll. Bounded to at most 10s (or the /// configured if shorter) so the no-data /// terminal — a deadline expiry over gRPC — is reached promptly instead of stalling the read for /// the full request timeout. When rows are available the server returns them well before this. /// private DateTime EventPollDeadline() { TimeSpan cap = TimeSpan.FromSeconds(10); TimeSpan poll = _options.RequestTimeout < cap ? _options.RequestTimeout : cap; return DateTime.UtcNow.Add(poll); } /// /// Deadline for the best-effort registration RPCs. Bounded to at most 5s: several of these /// (RegisterTags / EnsureTags / GetHistorianInfo) stall server-side on the remote 2023 R2 /// box (observed live 2026-06-22) and only return at their deadline, so an unbounded /// would make the registration phase dominate the read. They are /// swallowed via regardless of outcome. /// private DateTime RegistrationDeadline() { TimeSpan cap = TimeSpan.FromSeconds(5); TimeSpan d = _options.RequestTimeout < cap ? _options.RequestTimeout : cap; return DateTime.UtcNow.Add(d); } /// /// Replays the native event-tag registration sequence (UpdC3 → 6 system params → RTag2 → 1 more /// system param → cross-service GetV probes → EnsT2) over the gRPC services. Best-effort: each /// call is wrapped so an individual rejection on this server does not abort the chain — the goal /// is to drive the server-side session into the state StartEventQuery / GetNextEventQueryResultBuffer /// expect. Buffers come from . /// private void RegisterCmEventTag(HistorianGrpcConnection connection, HistorianGrpcHandshake.Session session, CancellationToken cancellationToken) { var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel); var statusClient = new GrpcStatus.StatusService.StatusServiceClient(connection.Channel); // Native 2023 R2 gRPC event-connection registration sequence (captured order): // UpdateClientStatus -> RegisterTags(CM_EVENT) -> EnsureTags(CM_EVENT) -> GetHistorianInfo // -> GetSystemParameter x7. (StartEventQuery follows in RunEventQuery.) The 2020-WCF-era extra // probes (cross-service GetV, params-before-register) are NOT in the gRPC event flow. byte[] clientStatus = HistorianEventRegistrationProtocol.BuildUpdateClientStatusBlob(); TryRun(() => historyClient.UpdateClientStatus( new GrpcHistory.UpdateClientStatusRequest { StrHandle = session.StringHandle, BtClientStatus = ByteString.CopyFrom(clientStatus) }, connection.Metadata, RegistrationDeadline(), cancellationToken)); byte[] registerBuffer = HistorianEventRegistrationProtocol.BuildRegisterCmEventInputBuffer(); try { GrpcHistory.RegisterTagsResponse rt = historyClient.RegisterTags( new GrpcHistory.RegisterTagsRequest { StrHandle = session.StringHandle, BtTagInfos = ByteString.CopyFrom(registerBuffer) }, connection.Metadata, RegistrationDeadline(), cancellationToken); RegistrationDiag += $"RTag={rt.Status?.BSuccess} e={Convert.ToHexString(rt.Status?.BtError?.ToByteArray() ?? [])}; "; } catch (Exception ex) { RegistrationDiag += $"RTag=EX:{ex.GetType().Name}; "; } // gRPC CM_EVENT EnsureTags uses the 86-byte native format (8-byte header + the …2f27 event-type // GUID), NOT the 2020 WCF CTagMetadata. byte[] payload = HistorianAddTagsProtocol.SerializeCmEventEnsureTagsGrpc(DateTime.UtcNow); try { GrpcHistory.EnsureTagsResponse et = historyClient.EnsureTags( new GrpcHistory.EnsureTagsRequest { StrHandle = session.StringHandle, BtTagInfos = ByteString.CopyFrom(payload), ElementCount = 1 }, connection.Metadata, RegistrationDeadline(), cancellationToken); RegistrationDiag += $"EnsT={et.Status?.BSuccess} e={Convert.ToHexString(et.Status?.BtError?.ToByteArray() ?? [])} out={Convert.ToHexString(et.BtTagStatus?.ToByteArray() ?? [])}; "; } catch (Exception ex) { RegistrationDiag += $"EnsT=EX:{ex.GetType().Name}; "; } byte[] historianVersionRequest = HistorianEventRegistrationProtocol.BuildGetHistorianInfoRequest("HistorianVersion"); TryRun(() => statusClient.GetHistorianInfo( new GrpcStatus.GetHistorianInfoRequest { StrHandle = session.StringHandle, BtRequest = ByteString.CopyFrom(historianVersionRequest) }, connection.Metadata, RegistrationDeadline(), cancellationToken)); string[] eventParams = ["AllowOriginals", "HistorianPartner", "HistorianVersion", "MaxCyclicStorageTimeout", "RealTimeWindow", "FutureTimeThreshold", "AllowRenameTags"]; foreach (string parameterName in eventParams) { TryRun(() => statusClient.GetSystemParameter( new GrpcStatus.GetSystemParameterRequest { UiHandle = session.ClientHandle, StrParameterName = parameterName }, connection.Metadata, RegistrationDeadline(), cancellationToken)); } } /// Diagnostic: outcomes of the key CM_EVENT registration RPCs. public string RegistrationDiag { get; private set; } = string.Empty; private List RunEventQuery( HistorianGrpcConnection connection, HistorianGrpcHandshake.Session session, DateTime startUtc, DateTime endUtc, HistorianEventFilter? filter, CancellationToken cancellationToken) { // HTTP/2-frame capture (grpc-event-query-capture.md #3) showed the stock client runs the event // query on a DEDICATED RetrievalService TLS connection, separate from the HistoryService // connection that opened+registered the session (correlated only by the session handle); our SDK // collapses every service onto one connection. Opt in via HISTORIAN_GRPC_EVENT_SPLIT_CHANNEL=1 to // run StartEventQuery/GetNext/EndEventQuery on their own connection (mirrors native conn4: no // re-handshake, just the existing handle), to test whether topology is the row-scoping gate. bool splitChannel = string.Equals( Environment.GetEnvironmentVariable("HISTORIAN_GRPC_EVENT_SPLIT_CHANNEL"), "1", StringComparison.Ordinal); HistorianGrpcConnection rconn = splitChannel ? HistorianGrpcChannelFactory.Create(_options) : connection; try { var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(rconn.Channel); GrpcRetrieval.GetRetrievalInterfaceVersionResponse retrievalVersion = retrievalClient.GetRetrievalInterfaceVersion( new GrpcRetrieval.GetRetrievalInterfaceVersionRequest(), rconn.Metadata, Deadline(), cancellationToken); HistorianServerVersionGate.Validate(HistorianServiceInterface.Retrieval, retrievalVersion.UiVersion, _options); // Version 6 envelope: the stock 2023 R2 client sends v6 (the WCF path's v5 request is accepted // here but is the legacy format). NECESSARY but not alone sufficient — live validation 2026-06-22 // showed rows still don't flow on v6 because the read also requires an EVENT-type connection // (the stock client opens ConnectionType=Event; our OpenSession opens a Process-style 0x402 // session). See docs/reverse-engineering/grpc-event-query-capture.md "remaining gate". IReadOnlyList attempts = HistorianEventQueryProtocol.CreateStartEventQueryAttempts( startUtc.ToUniversalTime(), endUtc.ToUniversalTime(), eventCount: 100, filter, version: 6); byte[] requestBuffer = attempts[0].RequestBuffer; GrpcRetrieval.StartEventQueryResponse startResponse = retrievalClient.StartEventQuery( new GrpcRetrieval.StartEventQueryRequest { UiHandle = session.ClientHandle, UiQueryRequestType = HistorianEventQueryProtocol.QueryRequestTypeEvent, BtRequest = ByteString.CopyFrom(requestBuffer) }, rconn.Metadata, Deadline(), cancellationToken); byte[] startError = startResponse.Status?.BtError?.ToByteArray() ?? []; if (!(startResponse.Status?.BSuccess ?? false)) { throw new InvalidOperationException( $"gRPC StartEventQuery failed (errorLen={startError.Length}, error5={HistorianEventRegistrationProtocol.DescribeNativeError(startError)})."); } uint queryHandle = startResponse.UiQueryHandle; RegistrationDiag += $"QH={queryHandle} clientH={session.ClientHandle} strH={session.StringHandle}; "; try { List events = []; while (true) { cancellationToken.ThrowIfCancellationRequested(); GrpcRetrieval.GetNextEventQueryResultBufferResponse nextResponse; try { nextResponse = retrievalClient.GetNextEventQueryResultBuffer( new GrpcRetrieval.GetNextEventQueryResultBufferRequest { UiHandle = session.ClientHandle, UiQueryHandle = queryHandle }, rconn.Metadata, EventPollDeadline(), cancellationToken); } catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded) { // No-data terminal. Over gRPC the 2023 R2 server LONG-POLLS GetNextEventQueryResultBuffer // when the query has no (more) rows to hand back, rather than returning the 5-byte // type=4 code=85 terminal the 2020 WCF op returns synchronously. A poll-deadline // expiry is therefore the gRPC equivalent of that soft terminal: stop reading and // return whatever rows were already collected. (Confirmed live 2026-06-22: the chain // runs and StartEventQuery succeeds, but GetNext blocks to the deadline on the idle // dev box, which holds no events.) See class remarks. LastErrorBufferDescription = "GetNext long-poll deadline (no-data terminal)"; return events; } byte[] resultBuffer = nextResponse.BtResult?.ToByteArray() ?? []; byte[] errorBuffer = nextResponse.Status?.BtError?.ToByteArray() ?? []; bool nextSuccess = nextResponse.Status?.BSuccess ?? false; LastResultBufferLength = resultBuffer.Length; LastErrorBufferDescription = HistorianEventRegistrationProtocol.DescribeNativeError(errorBuffer); LastResultBufferHex = Convert.ToHexString(resultBuffer.Length <= 48 ? resultBuffer : resultBuffer[..48]); LastErrorBufferHex = Convert.ToHexString(errorBuffer); // Any 5-byte type=4 error is a soft terminal (code 30 NoMoreData is canonical; code // 85 / 0x55 is the missing-registration signal seen on early runs). Mirror the WCF // orchestrator: stop reading and surface the diagnostic rather than throw. if (errorBuffer.Length == 5 && errorBuffer[0] == 4) { return events; } if (!nextSuccess) { throw new InvalidOperationException( $"gRPC GetNextEventQueryResultBuffer failed (errorLen={errorBuffer.Length}, error5={HistorianEventRegistrationProtocol.DescribeNativeError(errorBuffer)})."); } if (resultBuffer.Length > 0) { events.AddRange(HistorianEventRowProtocol.Parse(resultBuffer)); } if (resultBuffer.Length == 0 && errorBuffer.Length == 0) { return events; } } } finally { EndEventQuerySafely(retrievalClient, rconn, session.ClientHandle, queryHandle); } } finally { if (splitChannel) { rconn.Dispose(); } } } private void EndEventQuerySafely( GrpcRetrieval.RetrievalService.RetrievalServiceClient client, HistorianGrpcConnection connection, uint clientHandle, uint queryHandle) { try { client.EndEventQuery( new GrpcRetrieval.EndEventQueryRequest { UiHandle = clientHandle, UiQueryHandle = queryHandle }, connection.Metadata, Deadline(), CancellationToken.None); } catch { // Best-effort cleanup; the read result is already collected. } } private static void TryRun(Action action) { try { action(); } catch { } } }