feat: HistorianEventSession primitive + OpenEventSessionAsync (v8 Event reuse)
Reusable v8 Event session wrapping the B0a seams (OpenAndRegisterEventSession once + SendEventOnSession per op) with a GetSystemParameter keepalive; idempotent dispose. Mirrors HistorianSession (v6 sibling). pending.md A1 broadening, Stage B1. Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
@@ -382,6 +382,38 @@ public sealed class HistorianClient : IAsyncDisposable
|
||||
}, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Opens a reusable v8 EVENT session (ECDH + RegisterCmEventTag ONCE) over the 2023 R2 gRPC
|
||||
/// transport. The caller owns the session and must dispose it. Reusing the session across sends
|
||||
/// amortizes the ECDH+register cost (~10-16×, spike-proven); the server idle-expires it in ~25s,
|
||||
/// so keep it warm (HistorianEventSession.PingAsync) or re-open. For SendEvent amortization only —
|
||||
/// event reads are gated (C2) and not exposed here. RemoteGrpc only.
|
||||
/// </summary>
|
||||
public async Task<HistorianEventSession> OpenEventSessionAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_options.Transport != HistorianTransport.RemoteGrpc)
|
||||
{
|
||||
throw new ProtocolEvidenceMissingException(
|
||||
"HistorianEventSession is only supported over the 2023 R2 RemoteGrpc transport.");
|
||||
}
|
||||
|
||||
return await Task.Run(() =>
|
||||
{
|
||||
Grpc.HistorianGrpcConnection connection = Grpc.HistorianGrpcChannelFactory.Create(_options);
|
||||
try
|
||||
{
|
||||
var orch = new Grpc.HistorianGrpcEventWriteOrchestrator(_options);
|
||||
Grpc.HistorianGrpcHandshake.Session session = orch.OpenAndRegisterEventSession(connection, cancellationToken);
|
||||
return new HistorianEventSession(connection, session, _options);
|
||||
}
|
||||
catch
|
||||
{
|
||||
connection.Dispose(); // don't leak the channel if the handshake fails
|
||||
throw;
|
||||
}
|
||||
}, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
using AVEVA.Historian.Client.Grpc;
|
||||
using AVEVA.Historian.Client.Models;
|
||||
|
||||
namespace AVEVA.Historian.Client;
|
||||
|
||||
/// <summary>A live, reusable authenticated v8 EVENT session: holds one event gRPC connection + one
|
||||
/// open+registered Event handle and runs SendEvent on it WITHOUT re-handshaking. Reuse amortizes the
|
||||
/// ECDH+register cost (~10-16×, spike-proven). SendEvent only — event READS are gated (C2) and stay
|
||||
/// per-call. Keep in sync with <see cref="HistorianSession"/> (the v6 sibling).</summary>
|
||||
public sealed class HistorianEventSession : IAsyncDisposable
|
||||
{
|
||||
private readonly HistorianGrpcConnection _connection;
|
||||
private readonly HistorianGrpcHandshake.Session _session;
|
||||
private readonly HistorianClientOptions _options;
|
||||
private int _disposed;
|
||||
|
||||
internal HistorianEventSession(
|
||||
HistorianGrpcConnection connection, HistorianGrpcHandshake.Session session, HistorianClientOptions options)
|
||||
{
|
||||
_connection = connection;
|
||||
_session = session;
|
||||
_options = options;
|
||||
}
|
||||
|
||||
/// <summary>Exposes the held event gRPC connection for internal callers (e.g. the round-trip test
|
||||
/// verifying the keepalive op directly). Not part of the public surface.</summary>
|
||||
internal HistorianGrpcConnection Connection => _connection;
|
||||
|
||||
/// <summary>Exposes the held open+registered Event session handle for internal callers (e.g. the
|
||||
/// round-trip test verifying the keepalive op directly). Not part of the public surface.</summary>
|
||||
internal HistorianGrpcHandshake.Session Session => _session;
|
||||
|
||||
/// <summary>Sends one event on the held (open+registered) v8 Event session.</summary>
|
||||
public Task<bool> SendEventAsync(HistorianEvent evt, CancellationToken ct = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(evt);
|
||||
if (evt.RevisionVersion != 0)
|
||||
{
|
||||
throw new ProtocolEvidenceMissingException(
|
||||
"Only original events (RevisionVersion = 0) have a captured send encoding; " +
|
||||
"revision/update/delete event sends are not yet supported.");
|
||||
}
|
||||
|
||||
var orch = new HistorianGrpcEventWriteOrchestrator(_options);
|
||||
return Task.Run(() => orch.SendEventOnSession(_connection, _session, evt, ct), ct);
|
||||
}
|
||||
|
||||
/// <summary>Keepalive via a lightweight <c>GetSystemParameter</c> status read on the event session's
|
||||
/// <see cref="HistorianGrpcHandshake.Session.ClientHandle"/> (the same status op the native pre-query
|
||||
/// sequence issues against an authenticated Event session), under the server idle floor. Mirrors
|
||||
/// <see cref="HistorianSession.PingAsync"/>. The op's effectiveness on a v8 Event handle is
|
||||
/// live-verified by the round-trip test.</summary>
|
||||
public Task PingAsync(CancellationToken ct = default)
|
||||
=> Task.Run(() => HistorianGrpcStatusClient.GetSystemParameterOnSession(
|
||||
_connection, _session.ClientHandle, _options, "HistorianVersion", ct), ct);
|
||||
|
||||
/// <summary>Disposes the underlying event connection (idempotent).</summary>
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
if (Interlocked.Exchange(ref _disposed, 1) == 0)
|
||||
{
|
||||
_connection.Dispose();
|
||||
}
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
using AVEVA.Historian.Client.Grpc;
|
||||
using AVEVA.Historian.Client.Models;
|
||||
using Xunit.Abstractions;
|
||||
|
||||
namespace AVEVA.Historian.Client.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Live end-to-end round-trip for <see cref="HistorianEventSession"/> (the v8 EVENT sibling of
|
||||
/// <see cref="HistorianSessionRoundTripTests"/>): open ONE reusable event session, SendEvent on it
|
||||
/// TWICE (no re-handshake/re-register between sends), ping once, dispose. Env-gated exactly like
|
||||
/// <see cref="EventSessionReuseSpikeTests"/> (silent early-return skip without HISTORIAN_GRPC_HOST +
|
||||
/// HISTORIAN_USER + HISTORIAN_PASSWORD + HISTORIAN_EVENT_SANDBOX_TAG). Every send targets the sandbox
|
||||
/// identity ONLY (carried as the event SourceName/Type), never a production tag.
|
||||
/// </summary>
|
||||
public sealed class HistorianEventSessionRoundTripTests
|
||||
{
|
||||
private readonly ITestOutputHelper _output;
|
||||
|
||||
public HistorianEventSessionRoundTripTests(ITestOutputHelper output) => _output = output;
|
||||
|
||||
[Fact]
|
||||
public async Task EventSession_SendTwicePing_AllOnOneSession()
|
||||
{
|
||||
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
||||
string? sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_EVENT_SANDBOX_TAG");
|
||||
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(sandboxTag)
|
||||
|| string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER"))
|
||||
|| string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD")))
|
||||
{
|
||||
return; // skip — env not configured
|
||||
}
|
||||
|
||||
HistorianClientOptions options = BuildOptions(host);
|
||||
await using var client = new HistorianClient(options);
|
||||
await using HistorianEventSession session = await client.OpenEventSessionAsync(CancellationToken.None);
|
||||
|
||||
// 1) send TWICE on the SAME (open + CM_EVENT-registered) session — the 2nd skips ECDH+register.
|
||||
for (int i = 0; i < 2; i++)
|
||||
{
|
||||
bool sent = await session.SendEventAsync(BuildSandboxEvent(sandboxTag, attempt: i), CancellationToken.None);
|
||||
_output.WriteLine($"{i + 1}) reused-send[{i}] -> ok={sent}");
|
||||
Assert.True(sent, $"reused v8 Event session send[{i}] should be accepted (AddStreamValues BSuccess).");
|
||||
}
|
||||
|
||||
// 2) ping on the SAME session — must not throw.
|
||||
await session.PingAsync(CancellationToken.None);
|
||||
_output.WriteLine("3) ping -> ok");
|
||||
|
||||
// 3) prove the keepalive op actually RETURNS DATA on the v8 Event handle (not just no-throw):
|
||||
// issue the same underlying GetSystemParameter the ping uses, directly against the event
|
||||
// session's connection + ClientHandle, and assert it yields a non-empty value.
|
||||
string? keepalive = HistorianGrpcStatusClient.GetSystemParameterOnSession(
|
||||
session.Connection, session.Session.ClientHandle, options, "HistorianVersion", CancellationToken.None);
|
||||
Assert.False(string.IsNullOrEmpty(keepalive));
|
||||
_output.WriteLine($"4) keepalive GetSystemParameter on event handle -> '{keepalive}'");
|
||||
|
||||
_output.WriteLine("event-session round-trip OK (two sends + ping + verified keepalive on one session)");
|
||||
}
|
||||
|
||||
// Build a send event that targets the sandbox identity ONLY (mirrors EventSessionReuseSpikeTests.
|
||||
// BuildSandboxEvent): the CM_EVENT send buffer carries no per-tag routing field, so the sandbox tag
|
||||
// NAME is stamped into SourceName + Type + a marker Property so the appended event is unambiguously
|
||||
// attributable to the sandbox — never a production tag. A fresh Id/timestamps per attempt.
|
||||
private static HistorianEvent BuildSandboxEvent(string sandboxTag, int attempt)
|
||||
{
|
||||
DateTime now = DateTime.UtcNow;
|
||||
return new HistorianEvent(
|
||||
Id: Guid.NewGuid(),
|
||||
EventTimeUtc: now.AddSeconds(-attempt),
|
||||
ReceivedTimeUtc: now,
|
||||
Type: sandboxTag,
|
||||
SourceName: sandboxTag,
|
||||
Namespace: "HistGW.EventSessionRoundTrip",
|
||||
RevisionVersion: 0,
|
||||
Properties: new Dictionary<string, object?>
|
||||
{
|
||||
["RoundTripAttempt"] = attempt.ToString(System.Globalization.CultureInfo.InvariantCulture),
|
||||
["RoundTripMarker"] = "B1-event-session-roundtrip",
|
||||
});
|
||||
}
|
||||
|
||||
// verbatim copy of BuildOptions from HistorianSessionRoundTripTests / EventSessionReuseSpikeTests
|
||||
private static HistorianClientOptions BuildOptions(string host)
|
||||
{
|
||||
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
|
||||
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
|
||||
bool explicitCreds = !string.IsNullOrEmpty(user);
|
||||
int port = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_PORT"), out int parsed)
|
||||
? parsed
|
||||
: HistorianClientOptions.DefaultGrpcPort;
|
||||
bool tls = string.Equals(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TLS"), "true", StringComparison.OrdinalIgnoreCase);
|
||||
TimeSpan timeout = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TIMEOUT"), out int secs) && secs > 0
|
||||
? TimeSpan.FromSeconds(secs)
|
||||
: new HistorianClientOptions { Host = host }.RequestTimeout;
|
||||
|
||||
return new HistorianClientOptions
|
||||
{
|
||||
Host = host,
|
||||
Port = port,
|
||||
Transport = HistorianTransport.RemoteGrpc,
|
||||
GrpcUseTls = tls,
|
||||
AllowUntrustedServerCertificate = tls,
|
||||
ServerDnsIdentity = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_DNSID"),
|
||||
IntegratedSecurity = !explicitCreds,
|
||||
UserName = user ?? string.Empty,
|
||||
Password = password ?? string.Empty,
|
||||
RequestTimeout = timeout,
|
||||
Compression = true
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user