feat(grpc): event-on-session seam for the reuse spike (SendEvent[+ReadEvents])

Extract SendEventOnSession (and best-effort RunEventQueryOnSession) so the B0b spike
can run multiple event ops on one already-opened v8 Event session. RegisterCmEventTag
made independently callable. Behaviour-preserving (pending.md A1 broadening, Stage B0).

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
Joseph Doherty
2026-06-25 10:39:40 -04:00
parent 81aff03748
commit dc4141e718
3 changed files with 90 additions and 3 deletions
@@ -163,7 +163,7 @@ internal sealed class HistorianGrpcEventOrchestrator
RegisterCmEventTag(connection, session, cancellationToken); RegisterCmEventTag(connection, session, cancellationToken);
List<HistorianEvent> events = RunEventQuery(connection, session, startUtc, endUtc, filter, cancellationToken); List<HistorianEvent> events = RunEventQueryOnSession(connection, session, startUtc, endUtc, filter, cancellationToken);
// Honest no-data handling: when the query returns real rows, hand them back. When it instead // Honest no-data handling: when the query returns real rows, hand them back. When it instead
// reaches the no-data terminal with ZERO rows (the gRPC server long-polls GetNext rather than // reaches the no-data terminal with ZERO rows (the gRPC server long-polls GetNext rather than
@@ -273,7 +273,15 @@ internal sealed class HistorianGrpcEventOrchestrator
/// <summary>Diagnostic: outcomes of the key CM_EVENT registration RPCs.</summary> /// <summary>Diagnostic: outcomes of the key CM_EVENT registration RPCs.</summary>
public string RegistrationDiag { get; private set; } = string.Empty; public string RegistrationDiag { get; private set; } = string.Empty;
private List<HistorianEvent> RunEventQuery( // Spike seam (pending.md A1 broadening, Stage B0b): run ONLY the event query (StartEventQuery →
// GetNext loop → EndEventQuery) against an EXTERNALLY-supplied, already-opened + CM_EVENT-registered
// v8 Event connection + session — NO Create()/OpenSession/RegisterCmEventTag here. The per-call
// RunEventChain delegates to this so the per-call read and the B0b reuse spike share one query
// implementation (DRY). NOTE: event reads are otherwise GATED (C2) — the gRPC server long-polls
// GetNext to the no-data terminal and row-level retrieval is not yet verified over gRPC (see class
// remarks); the SEND seam is the spike's primary reuse signal. The split-channel opt-in
// (HISTORIAN_GRPC_EVENT_SPLIT_CHANNEL) is preserved inside, unchanged.
internal List<HistorianEvent> RunEventQueryOnSession(
HistorianGrpcConnection connection, HistorianGrpcConnection connection,
HistorianGrpcHandshake.Session session, HistorianGrpcHandshake.Session session,
DateTime startUtc, DateTime startUtc,
@@ -67,12 +67,41 @@ internal sealed class HistorianGrpcEventWriteOrchestrator
// The event SEND uses the same v8 Event connection as the event READ. The write-enabled // The event SEND uses the same v8 Event connection as the event READ. The write-enabled
// open buffer is byte-identical to the read-only one (verified live), so OpenSession's // open buffer is byte-identical to the read-only one (verified live), so OpenSession's
// event path is reused unchanged. // event path is reused unchanged. Per-call: open + register + send on a fresh session.
HistorianGrpcHandshake.Session session = OpenAndRegisterEventSession(connection, cancellationToken);
return SendEventOnSession(connection, session, evt, cancellationToken);
}
// Spike seam (pending.md A1 broadening, Stage B0b): open a v8 Event connection and drive the
// CM_EVENT registration ONCE, returning the warm (connection, session). The per-call Run() uses
// it for a single send; the B0b reuse spike calls it once and then issues MULTIPLE
// SendEventOnSession ops against the returned session to measure whether a v8 Event session can
// be reused across sends (it has NEVER been proven reusable — that is exactly what B0b measures).
// The caller owns the connection's lifetime (dispose it).
internal HistorianGrpcHandshake.Session OpenAndRegisterEventSession(
HistorianGrpcConnection connection,
CancellationToken cancellationToken)
{
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession( HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(
connection, _options, cancellationToken, eventConnection: true); connection, _options, cancellationToken, eventConnection: true);
RegisterCmEventTag(connection, session, cancellationToken); RegisterCmEventTag(connection, session, cancellationToken);
return session;
}
// Spike seam (pending.md A1 broadening, Stage B0b): perform ONLY the event send against an
// EXTERNALLY-supplied, already-opened + CM_EVENT-registered v8 Event connection + session —
// i.e. NO Create(), NO OpenSession(eventConnection:true), NO RegisterCmEventTag inside it. The
// per-call Run() path delegates here so the per-call send and the B0b reuse-spike send share one
// implementation (DRY) and stay byte-identical. The spike drives this repeatedly on one warm
// session to measure whether the server honors a reused v8 Event session for multiple sends.
internal bool SendEventOnSession(
HistorianGrpcConnection connection,
HistorianGrpcHandshake.Session session,
HistorianEvent evt,
CancellationToken cancellationToken)
{
var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel); var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel);
byte[] pBuf = HistorianEventWriteProtocol.SerializeAddStreamValuesBuffer(evt, DateTime.UtcNow); byte[] pBuf = HistorianEventWriteProtocol.SerializeAddStreamValuesBuffer(evt, DateTime.UtcNow);
@@ -0,0 +1,50 @@
using System.Reflection;
using AVEVA.Historian.Client.Grpc;
using Xunit;
namespace AVEVA.Historian.Client.Tests;
/// <summary>
/// Reflection guard for the event-on-session seams the B0b reuse spike drives (pending.md A1
/// broadening, Stage B0). Mirrors <see cref="TagClientOnSessionSeamTests"/>: the seam runs ONLY the
/// op against an externally-supplied (connection, session), so the spike can run MULTIPLE event ops
/// on one already-opened + registered v8 Event session to measure reuse.
/// </summary>
public class EventOnSessionSeamTests
{
private static MethodInfo RequireMethod(Type owner, string name)
{
MethodInfo? m = owner.GetMethod(
name, BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.Public | BindingFlags.Static);
Assert.NotNull(m);
return m!;
}
[Fact]
public void SendEventOnSession_ExposesSeam_WithConnectionAndSessionFirst()
{
MethodInfo m = RequireMethod(typeof(HistorianGrpcEventWriteOrchestrator), "SendEventOnSession");
ParameterInfo[] ps = m.GetParameters();
Assert.Equal("HistorianGrpcConnection", ps[0].ParameterType.Name);
Assert.Equal("Session", ps[1].ParameterType.Name);
}
[Fact]
public void OpenAndRegisterEventSession_ExposesRegisterOnceSeam()
{
// The spike registers CM_EVENT ONCE via this helper, then issues many SendEventOnSession ops.
MethodInfo m = RequireMethod(typeof(HistorianGrpcEventWriteOrchestrator), "OpenAndRegisterEventSession");
ParameterInfo[] ps = m.GetParameters();
Assert.Equal("HistorianGrpcConnection", ps[0].ParameterType.Name);
Assert.Equal("Session", m.ReturnType.Name);
}
[Fact]
public void RunEventQueryOnSession_ExposesSeam_WithConnectionAndSessionFirst()
{
MethodInfo m = RequireMethod(typeof(HistorianGrpcEventOrchestrator), "RunEventQueryOnSession");
ParameterInfo[] ps = m.GetParameters();
Assert.Equal("HistorianGrpcConnection", ps[0].ParameterType.Name);
Assert.Equal("Session", ps[1].ParameterType.Name);
}
}