diff --git a/docs/reverse-engineering/event-session-reuse-spike-results.md b/docs/reverse-engineering/event-session-reuse-spike-results.md new file mode 100644 index 0000000..bdce09e --- /dev/null +++ b/docs/reverse-engineering/event-session-reuse-spike-results.md @@ -0,0 +1,111 @@ +# Event-session reuse spike — live results + +> **Question:** does the 2023 R2 historian honor REUSING one authenticated **v8 Event** +> session (ECDH `ExchangeKey` → RC4 token → `ConnectionType=Event`, then `RegisterCmEventTag`) +> across multiple `SendEvent` ops, instead of the per-op open+register the SDK does today? +> This is the precondition for amortizing the EVENT path (HistorianGateway `pending.md` A1 +> broadening, Stage B0 / B1). +> +> **Verdict: GREEN — a v8 Event session reuses across sends, register-once is sufficient, +> and the amortization is ~10–16×. Event READS stay gated (C2) and are not a reuse signal.** + +**Date:** 2026-06-25 +**Branch:** `feat/amortization-broadening` +**Server:** live 2023 R2 (`wonder-sql-vd03`), RemoteGrpc transport. +**Sandbox identity:** `HISTORIAN_EVENT_SANDBOX_TAG=HistGW.LiveTest.EventSpike` — the CM_EVENT send +buffer has **no per-tag routing field** (it registers against a fixed system tag), so the sandbox +value is stamped into the event `Type`/`SourceName`/`Namespace` + a `SpikeMarker` property as an +**identity marker**; no real tag is written or overwritten. +**Harness:** `tests/AVEVA.Historian.Client.Tests/EventSessionReuseSpikeTests.cs` driving the B0a +seams `HistorianGrpcEventWriteOrchestrator.OpenAndRegisterEventSession` (open v8 Event session + +`RegisterCmEventTag` ONCE) and `SendEventOnSession` (send only — no open/register). + +--- + +## 1. Send reuse — GREEN + +`ReusedEventSession_SendsTwice_SecondSkipsHandshake` **passed** (both runs): one +`OpenAndRegisterEventSession` then **two `SendEventOnSession` on the same v8 Event session** — both +accepted (`AddStreamValues` `BSuccess=true`). + +``` +open+register (ECDH handshake + RegisterCmEventTag) = 242 ms (run 1: 350 ms) +registration diag: RTag=True; EnsT=True +reused-send[0] = 23 ms, ok=True +reused-send[1] = 22 ms, ok=True +``` + +The server accepts the same v8 Event client handle across back-to-back sends. The session handle is +an immutable `readonly record struct (uint ClientHandle, Guid StorageSessionId)`; the send is +stateless on the client side (each call reserializes a fresh `"OS"` buffer), so nothing per-op is +baked into the handle. + +## 2. Amortization — ~10–16× + +The open+register (P-256 ECDH `ExchangeKey` → RC4 credential token → v8 `OpenConnection` → +`RegisterCmEventTag`) costs ~242–350 ms and is paid **once**; a reused send is ~22 ms. So over a +burst of N sends the per-send cost collapses from ~(265 ms open + 22 ms) to ~22 ms — a ~10–16× win +on the send path, same shape as the v6 read/write amortization (`handshake-reuse-spike-results.md`). + +## 3. Register-once is sufficient — GREEN + +`ReusedEventSession_RegisterOnce_ThenSendMany` **passed**: `RegisterCmEventTag` run **once** (inside +`OpenAndRegisterEventSession`), then **three** sends, all accepted. + +``` +register-once send[0] = 25 ms, ok=True +register-once send[1] = 22 ms, ok=True +register-once send[2] = 22 ms, ok=True +``` + +CM_EVENT registration is **session-scoped, not per-send** — the server holds the registration for the +session's lifetime. A reuse pool registers once per warm session, not per op. + +## 4. Idle tolerance — survived ≥25 s (best-effort, single sample) + +`ReusedEventSession_IdleSweep_BestEffort` (log-only): after a send, a **25 s idle gap**, then another +send — **the second send succeeded** (`session SURVIVED the idle gap`). Notable: the v6 read session +idle-expires at a ≥25 s gap (`handshake-reuse-spike-results.md` §3), but this v8 Event session +survived 25 s. This is a single-sample best-effort observation — a keepalive should still ping under +the ~20 s floor for safety margin until the v8 Event idle boundary is characterized more finely. + +## 5. Read-after-send — GATED (C2), not a reuse signal + +`ReusedEventSession_ServesReadAfterSend_BestEffort` (log-only, hard-bounded by a 5 s gRPC deadline + +an 8 s cancellation): the read-after-send on the same session **did not return data** — it cancels at +the bound: + +``` +read-after-send -> swallowed (RpcException Cancelled / OperationCanceled) + => read gated/unverified over gRPC (expected) +``` + +This matches the pre-existing C2 gate: event **reads** over gRPC long-poll `GetNext` to a no-data +terminal and are unverified. So the spike did **not** prove a one-session-serves-both-kinds property +for reads — `SendEvent` is the only trustworthy reuse signal. (An unbounded read hung the first run; +the harness now bounds it so the spike is a clean, re-runnable record.) + +--- + +## 6. Implications for Stage B1 (the event-pool build) + +GREEN → a **separate event-session pool** (the approved B1 approach) is warranted and high-value: + +1. **Amortize `SendEvent` through a bounded event-session pool.** Open+register a v8 Event session + once per warm session; lease it per send op (exclusive, like the v6 pool); reuse across a burst. + ~10–16× on the send path. +2. **Keep the event pool SEPARATE from the v6 pool** (B1, as approved) — different auth (ECDH/v8), + heavier re-handshake on drop, and its own idle characteristics. +3. **`ReadEvents` stays PER-CALL / gated (C2).** Reads are unverified over gRPC regardless of reuse, + so the event pool amortizes **sends only**; `ReadEvents` is unaffected by B1 and stays on the + per-call path. (This refines the design's "route SendEvent + ReadEvents through the pool": only + `SendEvent` is routed; `ReadEvents` remains per-call because it is gated, not because of reuse.) +4. **Keepalive:** ping the warm event session under the idle floor. The cheap keepalive op for the + event channel is TBD in B1 (the v6 pool uses `GetSystemParameter`; the event session's equivalent + warm-touch needs picking — likely a no-op send or a lightweight event-channel status op). +5. **Reactive re-auth:** on an expiry-looking failure, evict + full v8 re-handshake (heavier than the + v6 re-auth — one ECDH + register penalty). + +**Gate decision: GREEN → HistorianGateway A1 Stage B1 (a bounded `HistorianEventSessionPool` for +`SendEvent`, default-on, parallel to the v6 `HistorianSessionPool`) is warranted and earns its own +re-planned design + plan.** diff --git a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventOrchestrator.cs b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventOrchestrator.cs index 758b527..1720728 100644 --- a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventOrchestrator.cs +++ b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventOrchestrator.cs @@ -163,7 +163,7 @@ internal sealed class HistorianGrpcEventOrchestrator RegisterCmEventTag(connection, session, cancellationToken); - List events = RunEventQuery(connection, session, startUtc, endUtc, filter, cancellationToken); + List events = RunEventQueryOnSession(connection, session, startUtc, endUtc, filter, cancellationToken); // Honest no-data handling: when the query returns real rows, hand them back. When it instead // reaches the no-data terminal with ZERO rows (the gRPC server long-polls GetNext rather than @@ -273,7 +273,15 @@ internal sealed class HistorianGrpcEventOrchestrator /// Diagnostic: outcomes of the key CM_EVENT registration RPCs. public string RegistrationDiag { get; private set; } = string.Empty; - private List RunEventQuery( + // Spike seam (pending.md A1 broadening, Stage B0b): run ONLY the event query (StartEventQuery → + // GetNext loop → EndEventQuery) against an EXTERNALLY-supplied, already-opened + CM_EVENT-registered + // v8 Event connection + session — NO Create()/OpenSession/RegisterCmEventTag here. The per-call + // RunEventChain delegates to this so the per-call read and the B0b reuse spike share one query + // implementation (DRY). NOTE: event reads are otherwise GATED (C2) — the gRPC server long-polls + // GetNext to the no-data terminal and row-level retrieval is not yet verified over gRPC (see class + // remarks); the SEND seam is the spike's primary reuse signal. The split-channel opt-in + // (HISTORIAN_GRPC_EVENT_SPLIT_CHANNEL) is preserved inside, unchanged. + internal List RunEventQueryOnSession( HistorianGrpcConnection connection, HistorianGrpcHandshake.Session session, DateTime startUtc, diff --git a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventWriteOrchestrator.cs b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventWriteOrchestrator.cs index 6971e60..f9f337c 100644 --- a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventWriteOrchestrator.cs +++ b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcEventWriteOrchestrator.cs @@ -67,12 +67,41 @@ internal sealed class HistorianGrpcEventWriteOrchestrator // The event SEND uses the same v8 Event connection as the event READ. The write-enabled // open buffer is byte-identical to the read-only one (verified live), so OpenSession's - // event path is reused unchanged. + // event path is reused unchanged. Per-call: open + register + send on a fresh session. + HistorianGrpcHandshake.Session session = OpenAndRegisterEventSession(connection, cancellationToken); + + return SendEventOnSession(connection, session, evt, cancellationToken); + } + + // Spike seam (pending.md A1 broadening, Stage B0b): open a v8 Event connection and drive the + // CM_EVENT registration ONCE, returning the warm (connection, session). The per-call Run() uses + // it for a single send; the B0b reuse spike calls it once and then issues MULTIPLE + // SendEventOnSession ops against the returned session to measure whether a v8 Event session can + // be reused across sends (it has NEVER been proven reusable — that is exactly what B0b measures). + // The caller owns the connection's lifetime (dispose it). + internal HistorianGrpcHandshake.Session OpenAndRegisterEventSession( + HistorianGrpcConnection connection, + CancellationToken cancellationToken) + { HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession( connection, _options, cancellationToken, eventConnection: true); RegisterCmEventTag(connection, session, cancellationToken); + return session; + } + // Spike seam (pending.md A1 broadening, Stage B0b): perform ONLY the event send against an + // EXTERNALLY-supplied, already-opened + CM_EVENT-registered v8 Event connection + session — + // i.e. NO Create(), NO OpenSession(eventConnection:true), NO RegisterCmEventTag inside it. The + // per-call Run() path delegates here so the per-call send and the B0b reuse-spike send share one + // implementation (DRY) and stay byte-identical. The spike drives this repeatedly on one warm + // session to measure whether the server honors a reused v8 Event session for multiple sends. + internal bool SendEventOnSession( + HistorianGrpcConnection connection, + HistorianGrpcHandshake.Session session, + HistorianEvent evt, + CancellationToken cancellationToken) + { var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel); byte[] pBuf = HistorianEventWriteProtocol.SerializeAddStreamValuesBuffer(evt, DateTime.UtcNow); diff --git a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcTagClient.cs b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcTagClient.cs index 79cd32e..945bde7 100644 --- a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcTagClient.cs +++ b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcTagClient.cs @@ -39,6 +39,27 @@ internal static class HistorianGrpcTagClient private static HistorianTagMetadata? GetTagMetadata(HistorianClientOptions options, string tag, CancellationToken cancellationToken) { byte[] tagInfos = GetTagInfosRaw(options, [tag], cancellationToken); + return ParseTagMetadata(tagInfos); + } + + // Spike/Phase-1 seam (pending.md A1): resolve tag metadata against an EXTERNALLY-supplied, + // already-authenticated connection + session — i.e. NO Create()/handshake here. The per-call + // GetTagMetadata and this seam share the parse tail (ParseTagMetadata) so neither duplicates the + // decode logic (DRY). + internal static HistorianTagMetadata? GetTagMetadataOnSession( + HistorianGrpcConnection connection, + HistorianGrpcHandshake.Session session, + string tag, + HistorianClientOptions options, + CancellationToken cancellationToken) + { + byte[] tagInfos = GetTagInfosRawOnSession(connection, session, [tag], options, cancellationToken); + return ParseTagMetadata(tagInfos); + } + + // Shared parse tail for both the per-call GetTagMetadata and the reuse-path GetTagMetadataOnSession. + private static HistorianTagMetadata? ParseTagMetadata(byte[] tagInfos) + { if (tagInfos.Length < 4) { return null; @@ -69,7 +90,19 @@ internal static class HistorianGrpcTagClient { using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options); HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, options, cancellationToken); + return GetTagInfosRawOnSession(connection, session, tags, options, cancellationToken); + } + // Spike/Phase-1 seam (pending.md A1): issue GetTagInfosFromName against an EXTERNALLY-supplied, + // already-authenticated connection + session — i.e. NO Create()/handshake here. GetTagInfosRaw + // delegates to this so the per-call path and the reuse path share one query implementation (DRY). + internal static byte[] GetTagInfosRawOnSession( + HistorianGrpcConnection connection, + HistorianGrpcHandshake.Session session, + IReadOnlyList tags, + HistorianClientOptions options, + CancellationToken cancellationToken) + { var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(connection.Channel); byte[] requestBuffer = BuildTagNamesBuffer(tags); GrpcRetrieval.GetTagInfosFromNameResponse response = retrievalClient.GetTagInfosFromName( @@ -112,6 +145,8 @@ internal static class HistorianGrpcTagClient return Task.Run(() => GetTagExtendedProperties(options, tag, cancellationToken), cancellationToken); } + // No …OnSession seam: extended-properties browse stays per-call (not amortized through the session + // pool — out of A1-broadening scope). Add a seam here only if the pool ever needs to route it. /// /// Issues a single page-0 GetTagExtendedPropertiesFromName call and returns the raw native /// btTeps response buffer (empty when the server reports no rows / non-success). Internal so @@ -222,6 +257,20 @@ internal static class HistorianGrpcTagClient { using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options); HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, options, cancellationToken); + return BrowseTagNamesOnSession(connection, session, filter, options, cancellationToken); + } + + // Spike/Phase-1 seam (pending.md A1): drive StartTagQuery → paged QueryTag → EndTagQuery against an + // EXTERNALLY-supplied, already-authenticated connection + session — i.e. NO Create()/handshake here. + // BrowseTagNames delegates to this so the per-call path and the reuse path share one browse + // implementation (DRY). + internal static List BrowseTagNamesOnSession( + HistorianGrpcConnection connection, + HistorianGrpcHandshake.Session session, + string filter, + HistorianClientOptions options, + CancellationToken cancellationToken) + { var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(connection.Channel); DateTime Deadline() => DateTime.UtcNow.Add(options.RequestTimeout); diff --git a/src/AVEVA.Historian.Client/HistorianClient.cs b/src/AVEVA.Historian.Client/HistorianClient.cs index 14373c8..e321a13 100644 --- a/src/AVEVA.Historian.Client/HistorianClient.cs +++ b/src/AVEVA.Historian.Client/HistorianClient.cs @@ -382,6 +382,38 @@ public sealed class HistorianClient : IAsyncDisposable }, cancellationToken).ConfigureAwait(false); } + /// + /// Opens a reusable v8 EVENT session (ECDH + RegisterCmEventTag ONCE) over the 2023 R2 gRPC + /// transport. The caller owns the session and must dispose it. Reusing the session across sends + /// amortizes the ECDH+register cost (~10-16×, spike-proven); the server idle-expires it in ~25s, + /// so keep it warm (HistorianEventSession.PingAsync) or re-open. For SendEvent amortization only — + /// event reads are gated (C2) and not exposed here. RemoteGrpc only. + /// + public async Task OpenEventSessionAsync(CancellationToken cancellationToken = default) + { + if (_options.Transport != HistorianTransport.RemoteGrpc) + { + throw new ProtocolEvidenceMissingException( + "HistorianEventSession is only supported over the 2023 R2 RemoteGrpc transport."); + } + + return await Task.Run(() => + { + Grpc.HistorianGrpcConnection connection = Grpc.HistorianGrpcChannelFactory.Create(_options); + try + { + var orch = new Grpc.HistorianGrpcEventWriteOrchestrator(_options); + Grpc.HistorianGrpcHandshake.Session session = orch.OpenAndRegisterEventSession(connection, cancellationToken); + return new HistorianEventSession(connection, session, _options); + } + catch + { + connection.Dispose(); // don't leak the channel if the handshake fails + throw; + } + }, cancellationToken).ConfigureAwait(false); + } + public ValueTask DisposeAsync() { return ValueTask.CompletedTask; diff --git a/src/AVEVA.Historian.Client/HistorianEventSession.cs b/src/AVEVA.Historian.Client/HistorianEventSession.cs new file mode 100644 index 0000000..bfff3d7 --- /dev/null +++ b/src/AVEVA.Historian.Client/HistorianEventSession.cs @@ -0,0 +1,66 @@ +using AVEVA.Historian.Client.Grpc; +using AVEVA.Historian.Client.Models; + +namespace AVEVA.Historian.Client; + +/// A live, reusable authenticated v8 EVENT session: holds one event gRPC connection + one +/// open+registered Event handle and runs SendEvent on it WITHOUT re-handshaking. Reuse amortizes the +/// ECDH+register cost (~10-16×, spike-proven). SendEvent only — event READS are gated (C2) and stay +/// per-call. Keep in sync with (the v6 sibling). +public sealed class HistorianEventSession : IAsyncDisposable +{ + private readonly HistorianGrpcConnection _connection; + private readonly HistorianGrpcHandshake.Session _session; + private readonly HistorianClientOptions _options; + private int _disposed; + + internal HistorianEventSession( + HistorianGrpcConnection connection, HistorianGrpcHandshake.Session session, HistorianClientOptions options) + { + _connection = connection; + _session = session; + _options = options; + } + + /// Exposes the held event gRPC connection for internal callers (e.g. the round-trip test + /// verifying the keepalive op directly). Not part of the public surface. + internal HistorianGrpcConnection Connection => _connection; + + /// Exposes the held open+registered Event session handle for internal callers (e.g. the + /// round-trip test verifying the keepalive op directly). Not part of the public surface. + internal HistorianGrpcHandshake.Session Session => _session; + + /// Sends one event on the held (open+registered) v8 Event session. + public Task SendEventAsync(HistorianEvent evt, CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(evt); + if (evt.RevisionVersion != 0) + { + throw new ProtocolEvidenceMissingException( + "Only original events (RevisionVersion = 0) have a captured send encoding; " + + "revision/update/delete event sends are not yet supported."); + } + + var orch = new HistorianGrpcEventWriteOrchestrator(_options); + return Task.Run(() => orch.SendEventOnSession(_connection, _session, evt, ct), ct); + } + + /// Keepalive via a lightweight GetSystemParameter status read on the event session's + /// (the same status op the native pre-query + /// sequence issues against an authenticated Event session), under the server idle floor. Mirrors + /// . The op's effectiveness on a v8 Event handle is + /// live-verified by the round-trip test. + public Task PingAsync(CancellationToken ct = default) + => Task.Run(() => HistorianGrpcStatusClient.GetSystemParameterOnSession( + _connection, _session.ClientHandle, _options, "HistorianVersion", ct), ct); + + /// Disposes the underlying event connection (idempotent). + public ValueTask DisposeAsync() + { + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + _connection.Dispose(); + } + return ValueTask.CompletedTask; + } +} diff --git a/src/AVEVA.Historian.Client/HistorianSession.cs b/src/AVEVA.Historian.Client/HistorianSession.cs index 120bbe6..228fc75 100644 --- a/src/AVEVA.Historian.Client/HistorianSession.cs +++ b/src/AVEVA.Historian.Client/HistorianSession.cs @@ -7,7 +7,7 @@ namespace AVEVA.Historian.Client; /// A live, reusable authenticated Historian session: holds one gRPC connection + one /// OpenConnection handle and runs ops on them WITHOUT re-handshaking. Reuse across ops amortizes the /// auth handshake. Idle-expires server-side in ~20-25s — callers keep it warm (PingAsync) or re-open. -/// Reads/historical-write/tag-write/status only; events are NOT exposed (separate channel+auth). +/// Reads, browse/metadata, historical-write, tag-write and status; events are NOT exposed (separate channel+auth). public sealed class HistorianSession : IAsyncDisposable { private readonly HistorianGrpcConnection _connection; @@ -89,6 +89,28 @@ public sealed class HistorianSession : IAsyncDisposable () => orch.RunAtTimeOnSession(_connection, _session.ClientHandle, tag, timestampsUtc, ct), ct); } + // --- browse / metadata (call the …OnSession seams, which take the full Session for the string handle) --- + + /// Browses tag names matching on the held session. + public async IAsyncEnumerable BrowseTagNamesAsync( + string filter = "*", + [EnumeratorCancellation] CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + List names = await Task.Run( + () => HistorianGrpcTagClient.BrowseTagNamesOnSession(_connection, _session, filter, _options, ct), ct) + .ConfigureAwait(false); + foreach (string name in names) + { + ct.ThrowIfCancellationRequested(); + yield return name; + } + } + + /// Reads metadata for on the held session (null if unknown). + public Task GetTagMetadataAsync(string tag, CancellationToken ct = default) + => Task.Run(() => HistorianGrpcTagClient.GetTagMetadataOnSession(_connection, _session, tag, _options, ct), ct); + // --- writes (the …OnSession seams take the full Session, since the historical write keys on the // string handle + tag GUID and the tag-config ops mix string/uint handles) --- diff --git a/tests/AVEVA.Historian.Client.Tests/EventOnSessionSeamTests.cs b/tests/AVEVA.Historian.Client.Tests/EventOnSessionSeamTests.cs new file mode 100644 index 0000000..4f8311f --- /dev/null +++ b/tests/AVEVA.Historian.Client.Tests/EventOnSessionSeamTests.cs @@ -0,0 +1,51 @@ +using System.Reflection; +using AVEVA.Historian.Client.Grpc; +using Xunit; + +namespace AVEVA.Historian.Client.Tests; + +/// +/// Reflection guard for the event-on-session seams the B0b reuse spike drives (pending.md A1 +/// broadening, Stage B0). Mirrors : the seam runs ONLY the +/// op against an externally-supplied (connection, session), so the spike can run MULTIPLE event ops +/// on one already-opened + registered v8 Event session to measure reuse. +/// +public class EventOnSessionSeamTests +{ + private static MethodInfo RequireMethod(Type owner, string name) + { + MethodInfo? m = owner.GetMethod( + name, BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.Public | BindingFlags.Static); + Assert.NotNull(m); + return m!; + } + + [Fact] + public void SendEventOnSession_ExposesSeam_WithConnectionAndSessionFirst() + { + MethodInfo m = RequireMethod(typeof(HistorianGrpcEventWriteOrchestrator), "SendEventOnSession"); + ParameterInfo[] ps = m.GetParameters(); + Assert.Equal("HistorianGrpcConnection", ps[0].ParameterType.Name); + Assert.Equal("Session", ps[1].ParameterType.Name); + } + + [Fact] + public void OpenAndRegisterEventSession_ExposesRegisterOnceSeam() + { + // The spike registers CM_EVENT ONCE via this helper, then issues many SendEventOnSession ops. + MethodInfo m = RequireMethod(typeof(HistorianGrpcEventWriteOrchestrator), "OpenAndRegisterEventSession"); + ParameterInfo[] ps = m.GetParameters(); + Assert.Equal("HistorianGrpcConnection", ps[0].ParameterType.Name); + Assert.Equal("CancellationToken", ps[1].ParameterType.Name); + Assert.Equal("Session", m.ReturnType.Name); + } + + [Fact] + public void RunEventQueryOnSession_ExposesSeam_WithConnectionAndSessionFirst() + { + MethodInfo m = RequireMethod(typeof(HistorianGrpcEventOrchestrator), "RunEventQueryOnSession"); + ParameterInfo[] ps = m.GetParameters(); + Assert.Equal("HistorianGrpcConnection", ps[0].ParameterType.Name); + Assert.Equal("Session", ps[1].ParameterType.Name); + } +} diff --git a/tests/AVEVA.Historian.Client.Tests/EventSessionReuseSpikeTests.cs b/tests/AVEVA.Historian.Client.Tests/EventSessionReuseSpikeTests.cs new file mode 100644 index 0000000..66c71eb --- /dev/null +++ b/tests/AVEVA.Historian.Client.Tests/EventSessionReuseSpikeTests.cs @@ -0,0 +1,255 @@ +using System.Diagnostics; +using Grpc.Core; +using AVEVA.Historian.Client.Grpc; +using AVEVA.Historian.Client.Models; +using Xunit; +using Xunit.Abstractions; + +namespace AVEVA.Historian.Client.Tests; + +/// +/// SPIKE (pending.md A1 broadening, Stage B0b): can ONE v8 Event session be REUSED across many event +/// ops without re-handshaking — the precondition for broadening handshake amortization to the event +/// path? Env-gated exactly like (silent early-return skip +/// without HISTORIAN_GRPC_HOST + HISTORIAN_USER + HISTORIAN_PASSWORD + HISTORIAN_EVENT_SANDBOX_TAG). +/// +/// This is the B0b HARNESS only — it is RUN LIVE by a human over VPN in B0c. It SKIPS cleanly offline +/// (no historian contacted, no event sent). It drives the B0a internal seams directly: +/// (open v8 Event session +/// + RegisterCmEventTag ONCE) and +/// (send-only, on the externally-supplied warm session). +/// +/// Spike questions (priority order), mapped to the test methods below: +/// (1) Does a v8 Event session survive REUSE? — +/// (PRIMARY GREEN/RED signal: two sends on one session both succeed; the 2nd skips ECDH+register). +/// (2) Does REGISTER-ONCE work? — +/// (OpenAndRegister once, then SendEventOnSession N× — no per-send re-registration). +/// (3) ONE-KIND best-effort — +/// (can the same session also serve a ReadEvents after a send? LOGGED, never asserted — reads are gated C2). +/// (4) IDLE expiry best-effort — +/// (how long can the session sit idle before a send breaks? LOGGED, never asserted). +/// +/// SAFETY: every send targets the env var HISTORIAN_EVENT_SANDBOX_TAG ONLY (carried as the event +/// SourceName/Type so the appended events are unambiguously attributable to the sandbox identity, never +/// a production tag). Success is ASSERTED for (1)/(2); latency is LOGGED only (no flaky perf gates). +/// +public sealed class EventSessionReuseSpikeTests +{ + private const int SendMany = 3; + private readonly ITestOutputHelper _output; + + public EventSessionReuseSpikeTests(ITestOutputHelper output) => _output = output; + + // (1) REUSE VALIDITY — PRIMARY signal. Open+register ONE v8 Event session, then SendEventOnSession + // TWICE on it with NO re-handshake/re-register between sends. If the server rejects reusing a v8 + // Event session, send #2 fails (false / throws) -> RED finding. Both succeed -> GREEN (event-session + // reuse is sound, the precondition for event amortization). Latency LOGGED so B0c sees the win + // (open+register cost vs the two reused sends). + [Fact] + public void ReusedEventSession_SendsTwice_SecondSkipsHandshake() + { + if (!TryGetEnv(out string host, out string sandboxTag)) return; + HistorianClientOptions options = BuildOptions(host); + var orchestrator = new HistorianGrpcEventWriteOrchestrator(options); + + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options); + + var swOpen = Stopwatch.StartNew(); + HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None); + swOpen.Stop(); + _output.WriteLine($"open+register (ECDH handshake + RegisterCmEventTag) = {swOpen.ElapsedMilliseconds} ms"); + _output.WriteLine($"registration diag: {orchestrator.RegistrationDiag}"); + + for (int i = 0; i < 2; i++) + { + HistorianEvent evt = BuildSandboxEvent(sandboxTag, attempt: i); + var sw = Stopwatch.StartNew(); + bool ok = orchestrator.SendEventOnSession(connection, session, evt, CancellationToken.None); + sw.Stop(); + _output.WriteLine($"reused-send[{i}] = {sw.ElapsedMilliseconds} ms, ok={ok}, lastErr='{orchestrator.LastSendErrorDescription}'"); + Assert.True(ok, $"reused v8 Event session send[{i}] should be accepted (AddStreamValues BSuccess)."); + } + } + + // (2) REGISTER-ONCE. Open+register ONCE, then SendEventOnSession N× — proving RegisterCmEventTag does + // NOT need re-running per send (the seam's whole point). All sends must succeed. + [Fact] + public void ReusedEventSession_RegisterOnce_ThenSendMany() + { + if (!TryGetEnv(out string host, out string sandboxTag)) return; + HistorianClientOptions options = BuildOptions(host); + var orchestrator = new HistorianGrpcEventWriteOrchestrator(options); + + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options); + HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None); + + for (int i = 0; i < SendMany; i++) + { + HistorianEvent evt = BuildSandboxEvent(sandboxTag, attempt: i); + var sw = Stopwatch.StartNew(); + bool ok = orchestrator.SendEventOnSession(connection, session, evt, CancellationToken.None); + sw.Stop(); + _output.WriteLine($"register-once send[{i}] = {sw.ElapsedMilliseconds} ms, ok={ok}"); + Assert.True(ok, $"register-once send[{i}] should be accepted without per-send re-registration."); + } + } + + // (3) ONE-KIND PROBE (best-effort). After a send on the warm session, try a ReadEvents on the SAME + // session. Event reads are GATED (C2 — the gRPC server long-polls GetNext to the no-data terminal and + // row-level retrieval is not verified over gRPC), so the outcome (rows or exception) is LOGGED, never + // asserted: the test passes as long as the catch swallows any failure. Records the one-kind finding + // (can one Event session serve both send and read?) for B0c. + [Fact] + public void ReusedEventSession_ServesReadAfterSend_BestEffort() + { + if (!TryGetEnv(out string host, out string sandboxTag)) return; + HistorianClientOptions options = BuildOptions(host); + var writeOrch = new HistorianGrpcEventWriteOrchestrator(options); + + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options); + HistorianGrpcHandshake.Session session = writeOrch.OpenAndRegisterEventSession(connection, CancellationToken.None); + + bool sent = writeOrch.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 0), CancellationToken.None); + _output.WriteLine($"seed-send before read-probe ok={sent}"); + + // HARD bound so this probe CANNOT hang on the known C2 event-read long-poll (GetNext blocks to the + // no-data terminal on an idle box). Two independent fuses: (1) a read-only options copy with a 5s + // RequestTimeout so each underlying GetNext RPC deadlines quickly (the read orchestrator caps its + // poll deadline at min(10s, RequestTimeout)); (2) an 8s CancellationToken passed as ct so the chain + // is cancelled even if a per-RPC deadline is not honored over a tunnel. Whichever fires first, the + // method returns in ~10s max even when the read never returns data. + HistorianClientOptions readOptions = BuildOptions(host, requestTimeoutOverride: TimeSpan.FromSeconds(5)); + var readOrch = new HistorianGrpcEventOrchestrator(readOptions); + using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(8)); + + (DateTime startUtc, DateTime endUtc) = LastSevenDays(); + try + { + List rows = readOrch.RunEventQueryOnSession( + connection, session, startUtc, endUtc, filter: null, readCts.Token); + _output.WriteLine($"read-after-send -> OK (rows={rows.Count}) => ONE-KIND (an Event session serves send AND read)"); + } + catch (Exception ex) when (ex is OperationCanceledException + || (ex is RpcException rpc && rpc.StatusCode == StatusCode.DeadlineExceeded)) + { + // EXPECTED outcome: the read hit its bound (CTS timeout or per-RPC deadline) without returning — + // consistent with the C2 event-read gating (GetNext long-polls to the no-data terminal). This is + // the recorded one-kind finding, NOT a failure. + _output.WriteLine($"read-after-send did not return within the bound (consistent with C2 event-read gating): {ex.GetType().Name}"); + } + catch (Exception ex) + { + // Any other rejection on the reused session is also the finding, not a failure. + _output.WriteLine($"read-after-send -> swallowed ({ex.GetType().Name}: {ex.Message}) => read gated/unverified over gRPC (expected)"); + } + // No assertion: this method's job is to RECORD the one-kind outcome for B0c, not gate on it. + } + + // (4) IDLE-EXPIRY SWEEP (best-effort, log-only). Send, sit idle for a gap, send again; LOG whether the + // 2nd send broke (and after how long). Bounds how long a warm Event session may sit idle before the + // server expires it — informs the keepalive cadence for an event-session pool. Default gap 25s; + // override via HISTORIAN_EVENT_IDLE_SECONDS. NEVER asserted (a break is the finding, not a failure). + [Fact] + [Trait("Category", "LiveSpike")] + public void ReusedEventSession_IdleSweep_BestEffort() + { + if (!TryGetEnv(out string host, out string sandboxTag)) return; + HistorianClientOptions options = BuildOptions(host); + var orchestrator = new HistorianGrpcEventWriteOrchestrator(options); + + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options); + HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None); + + bool first = orchestrator.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 0), CancellationToken.None); + _output.WriteLine($"idle-sweep first send ok={first}"); + + int idleSec = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_EVENT_IDLE_SECONDS"), out int parsed) && parsed > 0 + ? parsed + : 25; + _output.WriteLine($"idle-sweep: sleeping {idleSec}s before the second send..."); + Thread.Sleep(TimeSpan.FromSeconds(idleSec)); + + try + { + bool second = orchestrator.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 1), CancellationToken.None); + _output.WriteLine($"idle {idleSec}s -> second send ok={second} (session {(second ? "SURVIVED" : "rejected")} the idle gap)"); + } + catch (Exception ex) + { + _output.WriteLine($"idle {idleSec}s -> second send BROKE ({ex.GetType().Name}: {ex.Message}) — session expired while idle"); + } + // No assertion: idle-expiry timing is a LOGGED finding for the keepalive cadence, not a gate. + } + + // --- helpers --- + + // Build a send event that targets the sandbox identity ONLY. The CM_EVENT send buffer carries no + // per-tag routing field (it registers against the CM_EVENT system tag), so we stamp the sandbox tag + // NAME into SourceName + Type and a marker Property so the appended event is unambiguously + // attributable to the sandbox — never a production tag. A fresh Id/timestamps per attempt. + private static HistorianEvent BuildSandboxEvent(string sandboxTag, int attempt) + { + DateTime now = DateTime.UtcNow; + return new HistorianEvent( + Id: Guid.NewGuid(), + EventTimeUtc: now.AddSeconds(-attempt), + ReceivedTimeUtc: now, + Type: sandboxTag, + SourceName: sandboxTag, + Namespace: "HistGW.EventReuseSpike", + RevisionVersion: 0, + Properties: new Dictionary + { + ["SpikeAttempt"] = attempt.ToString(System.Globalization.CultureInfo.InvariantCulture), + ["SpikeMarker"] = "B0b-event-session-reuse", + }); + } + + private static bool TryGetEnv(out string host, out string sandboxTag) + { + host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? ""; + sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_EVENT_SANDBOX_TAG") ?? ""; + return !string.IsNullOrWhiteSpace(host) + && !string.IsNullOrWhiteSpace(sandboxTag) + && !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")) + && !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD")); + } + + private static (DateTime StartUtc, DateTime EndUtc) LastSevenDays() + { + DateTime end = DateTime.UtcNow; + return (end - TimeSpan.FromDays(7), end); + } + + // requestTimeoutOverride: when set, forces RequestTimeout (used by the read-after-send probe to give + // each GetNext RPC a short deadline). null preserves the env-driven default for the send/idle methods. + private static HistorianClientOptions BuildOptions(string host, TimeSpan? requestTimeoutOverride = null) + { + string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER"); + string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD"); + bool explicitCreds = !string.IsNullOrEmpty(user); + int port = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_PORT"), out int parsed) + ? parsed + : HistorianClientOptions.DefaultGrpcPort; + bool tls = string.Equals(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TLS"), "true", StringComparison.OrdinalIgnoreCase); + TimeSpan timeout = requestTimeoutOverride + ?? (int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TIMEOUT"), out int secs) && secs > 0 + ? TimeSpan.FromSeconds(secs) + : new HistorianClientOptions { Host = host }.RequestTimeout); + + return new HistorianClientOptions + { + Host = host, + Port = port, + Transport = HistorianTransport.RemoteGrpc, + GrpcUseTls = tls, + AllowUntrustedServerCertificate = tls, + ServerDnsIdentity = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_DNSID"), + IntegratedSecurity = !explicitCreds, + UserName = user ?? string.Empty, + Password = password ?? string.Empty, + RequestTimeout = timeout, + Compression = true + }; + } +} diff --git a/tests/AVEVA.Historian.Client.Tests/HistorianEventSessionRoundTripTests.cs b/tests/AVEVA.Historian.Client.Tests/HistorianEventSessionRoundTripTests.cs new file mode 100644 index 0000000..2e2d4cb --- /dev/null +++ b/tests/AVEVA.Historian.Client.Tests/HistorianEventSessionRoundTripTests.cs @@ -0,0 +1,111 @@ +using AVEVA.Historian.Client.Grpc; +using AVEVA.Historian.Client.Models; +using Xunit.Abstractions; + +namespace AVEVA.Historian.Client.Tests; + +/// +/// Live end-to-end round-trip for (the v8 EVENT sibling of +/// ): open ONE reusable event session, SendEvent on it +/// TWICE (no re-handshake/re-register between sends), ping once, dispose. Env-gated exactly like +/// (silent early-return skip without HISTORIAN_GRPC_HOST + +/// HISTORIAN_USER + HISTORIAN_PASSWORD + HISTORIAN_EVENT_SANDBOX_TAG). Every send targets the sandbox +/// identity ONLY (carried as the event SourceName/Type), never a production tag. +/// +public sealed class HistorianEventSessionRoundTripTests +{ + private readonly ITestOutputHelper _output; + + public HistorianEventSessionRoundTripTests(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task EventSession_SendTwicePing_AllOnOneSession() + { + string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST"); + string? sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_EVENT_SANDBOX_TAG"); + if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(sandboxTag) + || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")) + || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD"))) + { + return; // skip — env not configured + } + + HistorianClientOptions options = BuildOptions(host); + await using var client = new HistorianClient(options); + await using HistorianEventSession session = await client.OpenEventSessionAsync(CancellationToken.None); + + // 1) send TWICE on the SAME (open + CM_EVENT-registered) session — the 2nd skips ECDH+register. + for (int i = 0; i < 2; i++) + { + bool sent = await session.SendEventAsync(BuildSandboxEvent(sandboxTag, attempt: i), CancellationToken.None); + _output.WriteLine($"{i + 1}) reused-send[{i}] -> ok={sent}"); + Assert.True(sent, $"reused v8 Event session send[{i}] should be accepted (AddStreamValues BSuccess)."); + } + + // 2) ping on the SAME session — must not throw. + await session.PingAsync(CancellationToken.None); + _output.WriteLine("3) ping -> ok"); + + // 3) prove the keepalive op actually RETURNS DATA on the v8 Event handle (not just no-throw): + // issue the same underlying GetSystemParameter the ping uses, directly against the event + // session's connection + ClientHandle, and assert it yields a non-empty value. + string? keepalive = HistorianGrpcStatusClient.GetSystemParameterOnSession( + session.Connection, session.Session.ClientHandle, options, "HistorianVersion", CancellationToken.None); + Assert.False(string.IsNullOrEmpty(keepalive)); + _output.WriteLine($"4) keepalive GetSystemParameter on event handle -> '{keepalive}'"); + + _output.WriteLine("event-session round-trip OK (two sends + ping + verified keepalive on one session)"); + } + + // Build a send event that targets the sandbox identity ONLY (mirrors EventSessionReuseSpikeTests. + // BuildSandboxEvent): the CM_EVENT send buffer carries no per-tag routing field, so the sandbox tag + // NAME is stamped into SourceName + Type + a marker Property so the appended event is unambiguously + // attributable to the sandbox — never a production tag. A fresh Id/timestamps per attempt. + private static HistorianEvent BuildSandboxEvent(string sandboxTag, int attempt) + { + DateTime now = DateTime.UtcNow; + return new HistorianEvent( + Id: Guid.NewGuid(), + EventTimeUtc: now.AddSeconds(-attempt), + ReceivedTimeUtc: now, + Type: sandboxTag, + SourceName: sandboxTag, + Namespace: "HistGW.EventSessionRoundTrip", + RevisionVersion: 0, + Properties: new Dictionary + { + ["RoundTripAttempt"] = attempt.ToString(System.Globalization.CultureInfo.InvariantCulture), + ["RoundTripMarker"] = "B1-event-session-roundtrip", + }); + } + + // verbatim copy of BuildOptions from HistorianSessionRoundTripTests / EventSessionReuseSpikeTests + private static HistorianClientOptions BuildOptions(string host) + { + string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER"); + string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD"); + bool explicitCreds = !string.IsNullOrEmpty(user); + int port = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_PORT"), out int parsed) + ? parsed + : HistorianClientOptions.DefaultGrpcPort; + bool tls = string.Equals(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TLS"), "true", StringComparison.OrdinalIgnoreCase); + TimeSpan timeout = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TIMEOUT"), out int secs) && secs > 0 + ? TimeSpan.FromSeconds(secs) + : new HistorianClientOptions { Host = host }.RequestTimeout; + + return new HistorianClientOptions + { + Host = host, + Port = port, + Transport = HistorianTransport.RemoteGrpc, + GrpcUseTls = tls, + AllowUntrustedServerCertificate = tls, + ServerDnsIdentity = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_DNSID"), + IntegratedSecurity = !explicitCreds, + UserName = user ?? string.Empty, + Password = password ?? string.Empty, + RequestTimeout = timeout, + Compression = true + }; + } +} diff --git a/tests/AVEVA.Historian.Client.Tests/HistorianSessionRoundTripTests.cs b/tests/AVEVA.Historian.Client.Tests/HistorianSessionRoundTripTests.cs index 1cfce4d..37c4c90 100644 --- a/tests/AVEVA.Historian.Client.Tests/HistorianSessionRoundTripTests.cs +++ b/tests/AVEVA.Historian.Client.Tests/HistorianSessionRoundTripTests.cs @@ -53,7 +53,21 @@ public sealed class HistorianSessionRoundTripTests await session.PingAsync(CancellationToken.None); // must not throw _output.WriteLine("4) ping -> ok"); - _output.WriteLine("session round-trip OK (write+read+status+ping on one session)"); + // 5) metadata + browse on the SAME session (no re-handshake) + HistorianTagMetadata? meta = await session.GetTagMetadataAsync(sandboxTag, CancellationToken.None); + Assert.NotNull(meta); + _output.WriteLine("5) metadata -> ok"); + + List browsed = []; + await foreach (string n in session.BrowseTagNamesAsync("*", CancellationToken.None)) + { + browsed.Add(n); + if (browsed.Count >= 5) break; + } + Assert.NotEmpty(browsed); + _output.WriteLine($"6) browse rows={browsed.Count}"); + + _output.WriteLine("session round-trip OK (write+read+status+ping+metadata+browse on one session)"); } // verbatim copy of BuildOptions from HandshakeReuseSpikeTests diff --git a/tests/AVEVA.Historian.Client.Tests/TagClientOnSessionSeamTests.cs b/tests/AVEVA.Historian.Client.Tests/TagClientOnSessionSeamTests.cs new file mode 100644 index 0000000..92a4078 --- /dev/null +++ b/tests/AVEVA.Historian.Client.Tests/TagClientOnSessionSeamTests.cs @@ -0,0 +1,22 @@ +using System.Reflection; +using AVEVA.Historian.Client.Grpc; +using Xunit; + +namespace AVEVA.Historian.Client.Tests; + +public class TagClientOnSessionSeamTests +{ + [Theory] + [InlineData("BrowseTagNamesOnSession")] + [InlineData("GetTagInfosRawOnSession")] + [InlineData("GetTagMetadataOnSession")] + public void TagClient_ExposesOnSessionSeam(string name) + { + MethodInfo? m = typeof(HistorianGrpcTagClient).GetMethod( + name, BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Public); + Assert.NotNull(m); + ParameterInfo[] ps = m!.GetParameters(); + Assert.Equal("HistorianGrpcConnection", ps[0].ParameterType.Name); + Assert.Equal("Session", ps[1].ParameterType.Name); + } +}