SendEvent over gRPC: implement + live-validate (was capture-gated)

Captured the native 2023 R2 client's gRPC event send (new capture-send-event
harness scenario): it rides HistoryService.AddStreamValues with the SAME "OS"
(0x534F) storage-sample buffer the WCF path already uses (HistorianEventWrite-
Protocol) — confirming "no distinct RPC", and that it is NOT the historical
write's "ON" buffer. Diffed the write-enabled vs read-only v8 Event open: byte-
identical apart from per-session crypto, so the existing OpenSession event path
is reused unchanged.

So SendEvent-over-gRPC was pure assembly of proven parts:
- HistorianGrpcEventWriteOrchestrator = v8 Event open + CM_EVENT registration
  (UpdC3/RegisterTags/EnsureTags) + AddStreamValues(OS buffer).
- HistorianClient.SendEventAsync now routes to it for RemoteGrpc (WCF otherwise).

Live-validated end-to-end against the 2023 R2 server: pure-managed SDK send →
AddStreamValues BSuccess=true → the event reads back from the server (markers
confirmed in returned event rows). The native gRPC RegisterTags(24B) +
EnsureTags(86B) byte-match our serializers (new GrpcEventSendProtocolTests
golden, closing the 83-vs-86 EnsureTags question). Gated live test
SendEventAsync_OverGrpc_AcceptsEvent (opt-in HISTORIAN_GRPC_EVENT_SEND=1).
331 offline tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
Joseph Doherty
2026-06-23 15:37:22 -04:00
parent ae536bb4b8
commit afc7c4bf96
6 changed files with 384 additions and 21 deletions
@@ -0,0 +1,136 @@
using Google.Protobuf;
using AVEVA.Historian.Client.Models;
using AVEVA.Historian.Client.Wcf;
using GrpcHistory = ArchestrA.Grpc.Contract.History;
using GrpcStatus = ArchestrA.Grpc.Contract.Status;
namespace AVEVA.Historian.Client.Grpc;
/// <summary>
/// 2023 R2 gRPC orchestrator for the event SEND (<see cref="HistorianClient.SendEventAsync"/>).
/// Captured live from the native 2023 R2 client (<c>capture-send-event</c> scenario,
/// 2026-06-23): an event send rides <c>HistoryService.AddStreamValues</c> with the SAME
/// <c>"OS"</c> (0x534F) storage-sample buffer the WCF AddS2 path uses
/// (<see cref="HistorianEventWriteProtocol"/>) — NOT a distinct event RPC and NOT the historical
/// write's <c>"ON"</c> buffer. The native client's write-enabled Event <c>OpenConnection</c>
/// request is byte-identical to the read-only event open (the ReadOnly arg does not change the v8
/// open buffer; diffed live — only the per-session client key + credential token differ), so the
/// existing <see cref="HistorianGrpcHandshake.OpenSession"/> event path is reused unchanged. The
/// chain on a single Event session:
/// <list type="number">
/// <item>OpenConnection (v8 Event, ExchangeKey ECDH auth) → string storage handle</item>
/// <item>CM_EVENT registration: UpdateClientStatus → RegisterTags → EnsureTags (the same
/// buffers the gRPC event READ replays — verified byte-identical to the capture)</item>
/// <item><c>HistoryService.AddStreamValues</c>(strHandle, "OS" event buffer)</item>
/// </list>
/// Only original events (<see cref="HistorianEvent.RevisionVersion"/> = 0) with string-valued
/// properties have a captured encoding; others throw <see cref="ProtocolEvidenceMissingException"/>
/// from <see cref="HistorianEventWriteProtocol"/>.
/// </summary>
internal sealed class HistorianGrpcEventWriteOrchestrator
{
private readonly HistorianClientOptions _options;
public HistorianGrpcEventWriteOrchestrator(HistorianClientOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
/// <summary>Diagnostic: type+code description of the most recent AddStreamValues error buffer.</summary>
public string LastSendErrorDescription { get; private set; } = string.Empty;
/// <summary>Diagnostic: outcomes of the CM_EVENT registration RPCs.</summary>
public string RegistrationDiag { get; private set; } = string.Empty;
public Task<bool> SendEventAsync(HistorianEvent evt, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(evt);
if (!_options.IntegratedSecurity && string.IsNullOrEmpty(_options.UserName))
{
throw new ProtocolEvidenceMissingException(
"Managed gRPC event send currently requires IntegratedSecurity or an explicit UserName + Password.");
}
if (evt.RevisionVersion != 0)
{
throw new ProtocolEvidenceMissingException(
"Only original events (RevisionVersion = 0) have a captured send encoding; " +
"revision/update/delete event sends are not yet supported.");
}
return Task.Run(() => Run(evt, cancellationToken), cancellationToken);
}
private bool Run(HistorianEvent evt, CancellationToken cancellationToken)
{
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
// The event SEND uses the same v8 Event connection as the event READ. The write-enabled
// open buffer is byte-identical to the read-only one (verified live), so OpenSession's
// event path is reused unchanged.
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(
connection, _options, cancellationToken, eventConnection: true);
RegisterCmEventTag(connection, session, cancellationToken);
var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel);
byte[] pBuf = HistorianEventWriteProtocol.SerializeAddStreamValuesBuffer(evt, DateTime.UtcNow);
GrpcHistory.AddStreamValuesResponse response = historyClient.AddStreamValues(
new GrpcHistory.AddStreamValuesRequest
{
StrHandle = session.StringHandle,
BtValues = ByteString.CopyFrom(pBuf),
},
connection.Metadata,
DateTime.UtcNow.Add(_options.RequestTimeout),
cancellationToken);
byte[] error = response.Status?.BtError?.ToByteArray() ?? [];
LastSendErrorDescription = HistorianEventRegistrationProtocol.DescribeNativeError(error);
return response.Status?.BSuccess ?? false;
}
/// <summary>
/// Replays the CM_EVENT registration the native event connection performs before a send:
/// UpdateClientStatus → RegisterTags(CM_EVENT) → EnsureTags(CM_EVENT). The buffers are shared
/// with the gRPC event READ path (<see cref="HistorianEventRegistrationProtocol"/> +
/// <see cref="HistorianAddTagsProtocol.SerializeCmEventEnsureTagsGrpc"/>) and were verified
/// byte-identical to the live capture. Best-effort: an individual rejection does not abort the
/// send (the server may already hold CM_EVENT registered for the session).
/// </summary>
private void RegisterCmEventTag(HistorianGrpcConnection connection, HistorianGrpcHandshake.Session session, CancellationToken cancellationToken)
{
var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel);
DateTime Deadline() => DateTime.UtcNow.Add(_options.RequestTimeout);
byte[] clientStatus = HistorianEventRegistrationProtocol.BuildUpdateClientStatusBlob();
try
{
historyClient.UpdateClientStatus(
new GrpcHistory.UpdateClientStatusRequest { StrHandle = session.StringHandle, BtClientStatus = ByteString.CopyFrom(clientStatus) },
connection.Metadata, Deadline(), cancellationToken);
}
catch { /* best-effort */ }
byte[] registerBuffer = HistorianEventRegistrationProtocol.BuildRegisterCmEventInputBuffer();
try
{
GrpcHistory.RegisterTagsResponse rt = historyClient.RegisterTags(
new GrpcHistory.RegisterTagsRequest { StrHandle = session.StringHandle, BtTagInfos = ByteString.CopyFrom(registerBuffer) },
connection.Metadata, Deadline(), cancellationToken);
RegistrationDiag += $"RTag={rt.Status?.BSuccess}; ";
}
catch (Exception ex) { RegistrationDiag += $"RTag=EX:{ex.GetType().Name}; "; }
byte[] payload = HistorianAddTagsProtocol.SerializeCmEventEnsureTagsGrpc(DateTime.UtcNow);
try
{
GrpcHistory.EnsureTagsResponse et = historyClient.EnsureTags(
new GrpcHistory.EnsureTagsRequest { StrHandle = session.StringHandle, BtTagInfos = ByteString.CopyFrom(payload), ElementCount = 1 },
connection.Metadata, Deadline(), cancellationToken);
RegistrationDiag += $"EnsT={et.Status?.BSuccess}; ";
}
catch (Exception ex) { RegistrationDiag += $"EnsT=EX:{ex.GetType().Name}; "; }
}
}
+13 -7
View File
@@ -114,18 +114,24 @@ public sealed class HistorianClient : IAsyncDisposable
}
/// <summary>
/// Sends a single <see cref="HistorianEvent"/> to the Historian's built-in CM_EVENT tag
/// over the WCF event pipeline (Open2 event mode → CM_EVENT registration → AddS2). The
/// event is appended to the historian's event history and is readable back via
/// <see cref="ReadEventsAsync"/> / the <c>v_AlarmEventHistory2</c> view. Only original
/// events (<see cref="HistorianEvent.RevisionVersion"/> = 0) with string-valued properties
/// are supported; other property value types and revision/update/delete events throw
/// Sends a single <see cref="HistorianEvent"/> to the Historian's built-in CM_EVENT tag.
/// Over WCF this runs Open2 event mode → CM_EVENT registration → AddS2; over the 2023 R2
/// <see cref="HistorianTransport.RemoteGrpc"/> transport it runs the captured-equivalent
/// v8 Event OpenConnection → CM_EVENT registration → <c>HistoryService.AddStreamValues</c>
/// with the same "OS" event buffer (live-captured 2026-06-23 — the send rides the same RPC
/// and buffer as the WCF path, not a distinct event RPC). The event is appended to the
/// historian's event history and is readable back via <see cref="ReadEventsAsync"/> /
/// the <c>v_AlarmEventHistory2</c> view. Only original events
/// (<see cref="HistorianEvent.RevisionVersion"/> = 0) with string-valued properties are
/// supported; other property value types and revision/update/delete events throw
/// <see cref="ProtocolEvidenceMissingException"/> until their wire encoding is captured.
/// </summary>
public Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(historianEvent);
return new HistorianWcfEventOrchestrator(_options).SendEventAsync(historianEvent, cancellationToken);
return _options.Transport == HistorianTransport.RemoteGrpc
? new Grpc.HistorianGrpcEventWriteOrchestrator(_options).SendEventAsync(historianEvent, cancellationToken)
: new HistorianWcfEventOrchestrator(_options).SendEventAsync(historianEvent, cancellationToken);
}
/// <summary>