From 2687b2b6d227f08e8875d4a5a977590d2a09dfac Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 25 Jun 2026 11:47:44 -0400 Subject: [PATCH] feat: HistorianEventSession primitive + OpenEventSessionAsync (v8 Event reuse) Reusable v8 Event session wrapping the B0a seams (OpenAndRegisterEventSession once + SendEventOnSession per op) with a GetSystemParameter keepalive; idempotent dispose. Mirrors HistorianSession (v6 sibling). pending.md A1 broadening, Stage B1. Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii --- src/AVEVA.Historian.Client/HistorianClient.cs | 32 +++++ .../HistorianEventSession.cs | 66 +++++++++++ .../HistorianEventSessionRoundTripTests.cs | 111 ++++++++++++++++++ 3 files changed, 209 insertions(+) create mode 100644 src/AVEVA.Historian.Client/HistorianEventSession.cs create mode 100644 tests/AVEVA.Historian.Client.Tests/HistorianEventSessionRoundTripTests.cs diff --git a/src/AVEVA.Historian.Client/HistorianClient.cs b/src/AVEVA.Historian.Client/HistorianClient.cs index 14373c8..e321a13 100644 --- a/src/AVEVA.Historian.Client/HistorianClient.cs +++ b/src/AVEVA.Historian.Client/HistorianClient.cs @@ -382,6 +382,38 @@ public sealed class HistorianClient : IAsyncDisposable }, cancellationToken).ConfigureAwait(false); } + /// + /// Opens a reusable v8 EVENT session (ECDH + RegisterCmEventTag ONCE) over the 2023 R2 gRPC + /// transport. The caller owns the session and must dispose it. Reusing the session across sends + /// amortizes the ECDH+register cost (~10-16×, spike-proven); the server idle-expires it in ~25s, + /// so keep it warm (HistorianEventSession.PingAsync) or re-open. For SendEvent amortization only — + /// event reads are gated (C2) and not exposed here. RemoteGrpc only. + /// + public async Task OpenEventSessionAsync(CancellationToken cancellationToken = default) + { + if (_options.Transport != HistorianTransport.RemoteGrpc) + { + throw new ProtocolEvidenceMissingException( + "HistorianEventSession is only supported over the 2023 R2 RemoteGrpc transport."); + } + + return await Task.Run(() => + { + Grpc.HistorianGrpcConnection connection = Grpc.HistorianGrpcChannelFactory.Create(_options); + try + { + var orch = new Grpc.HistorianGrpcEventWriteOrchestrator(_options); + Grpc.HistorianGrpcHandshake.Session session = orch.OpenAndRegisterEventSession(connection, cancellationToken); + return new HistorianEventSession(connection, session, _options); + } + catch + { + connection.Dispose(); // don't leak the channel if the handshake fails + throw; + } + }, cancellationToken).ConfigureAwait(false); + } + public ValueTask DisposeAsync() { return ValueTask.CompletedTask; diff --git a/src/AVEVA.Historian.Client/HistorianEventSession.cs b/src/AVEVA.Historian.Client/HistorianEventSession.cs new file mode 100644 index 0000000..bfff3d7 --- /dev/null +++ b/src/AVEVA.Historian.Client/HistorianEventSession.cs @@ -0,0 +1,66 @@ +using AVEVA.Historian.Client.Grpc; +using AVEVA.Historian.Client.Models; + +namespace AVEVA.Historian.Client; + +/// A live, reusable authenticated v8 EVENT session: holds one event gRPC connection + one +/// open+registered Event handle and runs SendEvent on it WITHOUT re-handshaking. Reuse amortizes the +/// ECDH+register cost (~10-16×, spike-proven). SendEvent only — event READS are gated (C2) and stay +/// per-call. Keep in sync with (the v6 sibling). +public sealed class HistorianEventSession : IAsyncDisposable +{ + private readonly HistorianGrpcConnection _connection; + private readonly HistorianGrpcHandshake.Session _session; + private readonly HistorianClientOptions _options; + private int _disposed; + + internal HistorianEventSession( + HistorianGrpcConnection connection, HistorianGrpcHandshake.Session session, HistorianClientOptions options) + { + _connection = connection; + _session = session; + _options = options; + } + + /// Exposes the held event gRPC connection for internal callers (e.g. the round-trip test + /// verifying the keepalive op directly). Not part of the public surface. + internal HistorianGrpcConnection Connection => _connection; + + /// Exposes the held open+registered Event session handle for internal callers (e.g. the + /// round-trip test verifying the keepalive op directly). Not part of the public surface. + internal HistorianGrpcHandshake.Session Session => _session; + + /// Sends one event on the held (open+registered) v8 Event session. + public Task SendEventAsync(HistorianEvent evt, CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(evt); + if (evt.RevisionVersion != 0) + { + throw new ProtocolEvidenceMissingException( + "Only original events (RevisionVersion = 0) have a captured send encoding; " + + "revision/update/delete event sends are not yet supported."); + } + + var orch = new HistorianGrpcEventWriteOrchestrator(_options); + return Task.Run(() => orch.SendEventOnSession(_connection, _session, evt, ct), ct); + } + + /// Keepalive via a lightweight GetSystemParameter status read on the event session's + /// (the same status op the native pre-query + /// sequence issues against an authenticated Event session), under the server idle floor. Mirrors + /// . The op's effectiveness on a v8 Event handle is + /// live-verified by the round-trip test. + public Task PingAsync(CancellationToken ct = default) + => Task.Run(() => HistorianGrpcStatusClient.GetSystemParameterOnSession( + _connection, _session.ClientHandle, _options, "HistorianVersion", ct), ct); + + /// Disposes the underlying event connection (idempotent). + public ValueTask DisposeAsync() + { + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + _connection.Dispose(); + } + return ValueTask.CompletedTask; + } +} diff --git a/tests/AVEVA.Historian.Client.Tests/HistorianEventSessionRoundTripTests.cs b/tests/AVEVA.Historian.Client.Tests/HistorianEventSessionRoundTripTests.cs new file mode 100644 index 0000000..2e2d4cb --- /dev/null +++ b/tests/AVEVA.Historian.Client.Tests/HistorianEventSessionRoundTripTests.cs @@ -0,0 +1,111 @@ +using AVEVA.Historian.Client.Grpc; +using AVEVA.Historian.Client.Models; +using Xunit.Abstractions; + +namespace AVEVA.Historian.Client.Tests; + +/// +/// Live end-to-end round-trip for (the v8 EVENT sibling of +/// ): open ONE reusable event session, SendEvent on it +/// TWICE (no re-handshake/re-register between sends), ping once, dispose. Env-gated exactly like +/// (silent early-return skip without HISTORIAN_GRPC_HOST + +/// HISTORIAN_USER + HISTORIAN_PASSWORD + HISTORIAN_EVENT_SANDBOX_TAG). Every send targets the sandbox +/// identity ONLY (carried as the event SourceName/Type), never a production tag. +/// +public sealed class HistorianEventSessionRoundTripTests +{ + private readonly ITestOutputHelper _output; + + public HistorianEventSessionRoundTripTests(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task EventSession_SendTwicePing_AllOnOneSession() + { + string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST"); + string? sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_EVENT_SANDBOX_TAG"); + if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(sandboxTag) + || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")) + || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD"))) + { + return; // skip — env not configured + } + + HistorianClientOptions options = BuildOptions(host); + await using var client = new HistorianClient(options); + await using HistorianEventSession session = await client.OpenEventSessionAsync(CancellationToken.None); + + // 1) send TWICE on the SAME (open + CM_EVENT-registered) session — the 2nd skips ECDH+register. + for (int i = 0; i < 2; i++) + { + bool sent = await session.SendEventAsync(BuildSandboxEvent(sandboxTag, attempt: i), CancellationToken.None); + _output.WriteLine($"{i + 1}) reused-send[{i}] -> ok={sent}"); + Assert.True(sent, $"reused v8 Event session send[{i}] should be accepted (AddStreamValues BSuccess)."); + } + + // 2) ping on the SAME session — must not throw. + await session.PingAsync(CancellationToken.None); + _output.WriteLine("3) ping -> ok"); + + // 3) prove the keepalive op actually RETURNS DATA on the v8 Event handle (not just no-throw): + // issue the same underlying GetSystemParameter the ping uses, directly against the event + // session's connection + ClientHandle, and assert it yields a non-empty value. + string? keepalive = HistorianGrpcStatusClient.GetSystemParameterOnSession( + session.Connection, session.Session.ClientHandle, options, "HistorianVersion", CancellationToken.None); + Assert.False(string.IsNullOrEmpty(keepalive)); + _output.WriteLine($"4) keepalive GetSystemParameter on event handle -> '{keepalive}'"); + + _output.WriteLine("event-session round-trip OK (two sends + ping + verified keepalive on one session)"); + } + + // Build a send event that targets the sandbox identity ONLY (mirrors EventSessionReuseSpikeTests. + // BuildSandboxEvent): the CM_EVENT send buffer carries no per-tag routing field, so the sandbox tag + // NAME is stamped into SourceName + Type + a marker Property so the appended event is unambiguously + // attributable to the sandbox — never a production tag. A fresh Id/timestamps per attempt. + private static HistorianEvent BuildSandboxEvent(string sandboxTag, int attempt) + { + DateTime now = DateTime.UtcNow; + return new HistorianEvent( + Id: Guid.NewGuid(), + EventTimeUtc: now.AddSeconds(-attempt), + ReceivedTimeUtc: now, + Type: sandboxTag, + SourceName: sandboxTag, + Namespace: "HistGW.EventSessionRoundTrip", + RevisionVersion: 0, + Properties: new Dictionary + { + ["RoundTripAttempt"] = attempt.ToString(System.Globalization.CultureInfo.InvariantCulture), + ["RoundTripMarker"] = "B1-event-session-roundtrip", + }); + } + + // verbatim copy of BuildOptions from HistorianSessionRoundTripTests / EventSessionReuseSpikeTests + private static HistorianClientOptions BuildOptions(string host) + { + string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER"); + string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD"); + bool explicitCreds = !string.IsNullOrEmpty(user); + int port = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_PORT"), out int parsed) + ? parsed + : HistorianClientOptions.DefaultGrpcPort; + bool tls = string.Equals(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TLS"), "true", StringComparison.OrdinalIgnoreCase); + TimeSpan timeout = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TIMEOUT"), out int secs) && secs > 0 + ? TimeSpan.FromSeconds(secs) + : new HistorianClientOptions { Host = host }.RequestTimeout; + + return new HistorianClientOptions + { + Host = host, + Port = port, + Transport = HistorianTransport.RemoteGrpc, + GrpcUseTls = tls, + AllowUntrustedServerCertificate = tls, + ServerDnsIdentity = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_DNSID"), + IntegratedSecurity = !explicitCreds, + UserName = user ?? string.Empty, + Password = password ?? string.Empty, + RequestTimeout = timeout, + Compression = true + }; + } +}