Merge feat/amortization-broadening: event-reuse spike + HistorianEventSession + browse/metadata seams
Track A (tag-client …OnSession seams + HistorianSession browse/metadata), the GREEN event-session reuse spike (B0), and the HistorianEventSession primitive. Live-validated. Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
@@ -0,0 +1,111 @@
|
||||
# Event-session reuse spike — live results
|
||||
|
||||
> **Question:** does the 2023 R2 historian honor REUSING one authenticated **v8 Event**
|
||||
> session (ECDH `ExchangeKey` → RC4 token → `ConnectionType=Event`, then `RegisterCmEventTag`)
|
||||
> across multiple `SendEvent` ops, instead of the per-op open+register the SDK does today?
|
||||
> This is the precondition for amortizing the EVENT path (HistorianGateway `pending.md` A1
|
||||
> broadening, Stage B0 / B1).
|
||||
>
|
||||
> **Verdict: GREEN — a v8 Event session reuses across sends, register-once is sufficient,
|
||||
> and the amortization is ~10–16×. Event READS stay gated (C2) and are not a reuse signal.**
|
||||
|
||||
**Date:** 2026-06-25
|
||||
**Branch:** `feat/amortization-broadening`
|
||||
**Server:** live 2023 R2 (`wonder-sql-vd03`), RemoteGrpc transport.
|
||||
**Sandbox identity:** `HISTORIAN_EVENT_SANDBOX_TAG=HistGW.LiveTest.EventSpike` — the CM_EVENT send
|
||||
buffer has **no per-tag routing field** (it registers against a fixed system tag), so the sandbox
|
||||
value is stamped into the event `Type`/`SourceName`/`Namespace` + a `SpikeMarker` property as an
|
||||
**identity marker**; no real tag is written or overwritten.
|
||||
**Harness:** `tests/AVEVA.Historian.Client.Tests/EventSessionReuseSpikeTests.cs` driving the B0a
|
||||
seams `HistorianGrpcEventWriteOrchestrator.OpenAndRegisterEventSession` (open v8 Event session +
|
||||
`RegisterCmEventTag` ONCE) and `SendEventOnSession` (send only — no open/register).
|
||||
|
||||
---
|
||||
|
||||
## 1. Send reuse — GREEN
|
||||
|
||||
`ReusedEventSession_SendsTwice_SecondSkipsHandshake` **passed** (both runs): one
|
||||
`OpenAndRegisterEventSession` then **two `SendEventOnSession` on the same v8 Event session** — both
|
||||
accepted (`AddStreamValues` `BSuccess=true`).
|
||||
|
||||
```
|
||||
open+register (ECDH handshake + RegisterCmEventTag) = 242 ms (run 1: 350 ms)
|
||||
registration diag: RTag=True; EnsT=True
|
||||
reused-send[0] = 23 ms, ok=True
|
||||
reused-send[1] = 22 ms, ok=True
|
||||
```
|
||||
|
||||
The server accepts the same v8 Event client handle across back-to-back sends. The session handle is
|
||||
an immutable `readonly record struct (uint ClientHandle, Guid StorageSessionId)`; the send is
|
||||
stateless on the client side (each call reserializes a fresh `"OS"` buffer), so nothing per-op is
|
||||
baked into the handle.
|
||||
|
||||
## 2. Amortization — ~10–16×
|
||||
|
||||
The open+register (P-256 ECDH `ExchangeKey` → RC4 credential token → v8 `OpenConnection` →
|
||||
`RegisterCmEventTag`) costs ~242–350 ms and is paid **once**; a reused send is ~22 ms. So over a
|
||||
burst of N sends the per-send cost collapses from ~(265 ms open + 22 ms) to ~22 ms — a ~10–16× win
|
||||
on the send path, same shape as the v6 read/write amortization (`handshake-reuse-spike-results.md`).
|
||||
|
||||
## 3. Register-once is sufficient — GREEN
|
||||
|
||||
`ReusedEventSession_RegisterOnce_ThenSendMany` **passed**: `RegisterCmEventTag` run **once** (inside
|
||||
`OpenAndRegisterEventSession`), then **three** sends, all accepted.
|
||||
|
||||
```
|
||||
register-once send[0] = 25 ms, ok=True
|
||||
register-once send[1] = 22 ms, ok=True
|
||||
register-once send[2] = 22 ms, ok=True
|
||||
```
|
||||
|
||||
CM_EVENT registration is **session-scoped, not per-send** — the server holds the registration for the
|
||||
session's lifetime. A reuse pool registers once per warm session, not per op.
|
||||
|
||||
## 4. Idle tolerance — survived ≥25 s (best-effort, single sample)
|
||||
|
||||
`ReusedEventSession_IdleSweep_BestEffort` (log-only): after a send, a **25 s idle gap**, then another
|
||||
send — **the second send succeeded** (`session SURVIVED the idle gap`). Notable: the v6 read session
|
||||
idle-expires at a ≥25 s gap (`handshake-reuse-spike-results.md` §3), but this v8 Event session
|
||||
survived 25 s. This is a single-sample best-effort observation — a keepalive should still ping under
|
||||
the ~20 s floor for safety margin until the v8 Event idle boundary is characterized more finely.
|
||||
|
||||
## 5. Read-after-send — GATED (C2), not a reuse signal
|
||||
|
||||
`ReusedEventSession_ServesReadAfterSend_BestEffort` (log-only, hard-bounded by a 5 s gRPC deadline +
|
||||
an 8 s cancellation): the read-after-send on the same session **did not return data** — it cancels at
|
||||
the bound:
|
||||
|
||||
```
|
||||
read-after-send -> swallowed (RpcException Cancelled / OperationCanceled)
|
||||
=> read gated/unverified over gRPC (expected)
|
||||
```
|
||||
|
||||
This matches the pre-existing C2 gate: event **reads** over gRPC long-poll `GetNext` to a no-data
|
||||
terminal and are unverified. So the spike did **not** prove a one-session-serves-both-kinds property
|
||||
for reads — `SendEvent` is the only trustworthy reuse signal. (An unbounded read hung the first run;
|
||||
the harness now bounds it so the spike is a clean, re-runnable record.)
|
||||
|
||||
---
|
||||
|
||||
## 6. Implications for Stage B1 (the event-pool build)
|
||||
|
||||
GREEN → a **separate event-session pool** (the approved B1 approach) is warranted and high-value:
|
||||
|
||||
1. **Amortize `SendEvent` through a bounded event-session pool.** Open+register a v8 Event session
|
||||
once per warm session; lease it per send op (exclusive, like the v6 pool); reuse across a burst.
|
||||
~10–16× on the send path.
|
||||
2. **Keep the event pool SEPARATE from the v6 pool** (B1, as approved) — different auth (ECDH/v8),
|
||||
heavier re-handshake on drop, and its own idle characteristics.
|
||||
3. **`ReadEvents` stays PER-CALL / gated (C2).** Reads are unverified over gRPC regardless of reuse,
|
||||
so the event pool amortizes **sends only**; `ReadEvents` is unaffected by B1 and stays on the
|
||||
per-call path. (This refines the design's "route SendEvent + ReadEvents through the pool": only
|
||||
`SendEvent` is routed; `ReadEvents` remains per-call because it is gated, not because of reuse.)
|
||||
4. **Keepalive:** ping the warm event session under the idle floor. The cheap keepalive op for the
|
||||
event channel is TBD in B1 (the v6 pool uses `GetSystemParameter`; the event session's equivalent
|
||||
warm-touch needs picking — likely a no-op send or a lightweight event-channel status op).
|
||||
5. **Reactive re-auth:** on an expiry-looking failure, evict + full v8 re-handshake (heavier than the
|
||||
v6 re-auth — one ECDH + register penalty).
|
||||
|
||||
**Gate decision: GREEN → HistorianGateway A1 Stage B1 (a bounded `HistorianEventSessionPool` for
|
||||
`SendEvent`, default-on, parallel to the v6 `HistorianSessionPool`) is warranted and earns its own
|
||||
re-planned design + plan.**
|
||||
@@ -163,7 +163,7 @@ internal sealed class HistorianGrpcEventOrchestrator
|
||||
|
||||
RegisterCmEventTag(connection, session, cancellationToken);
|
||||
|
||||
List<HistorianEvent> events = RunEventQuery(connection, session, startUtc, endUtc, filter, cancellationToken);
|
||||
List<HistorianEvent> events = RunEventQueryOnSession(connection, session, startUtc, endUtc, filter, cancellationToken);
|
||||
|
||||
// Honest no-data handling: when the query returns real rows, hand them back. When it instead
|
||||
// reaches the no-data terminal with ZERO rows (the gRPC server long-polls GetNext rather than
|
||||
@@ -273,7 +273,15 @@ internal sealed class HistorianGrpcEventOrchestrator
|
||||
/// <summary>Diagnostic: outcomes of the key CM_EVENT registration RPCs.</summary>
|
||||
public string RegistrationDiag { get; private set; } = string.Empty;
|
||||
|
||||
private List<HistorianEvent> RunEventQuery(
|
||||
// Spike seam (pending.md A1 broadening, Stage B0b): run ONLY the event query (StartEventQuery →
|
||||
// GetNext loop → EndEventQuery) against an EXTERNALLY-supplied, already-opened + CM_EVENT-registered
|
||||
// v8 Event connection + session — NO Create()/OpenSession/RegisterCmEventTag here. The per-call
|
||||
// RunEventChain delegates to this so the per-call read and the B0b reuse spike share one query
|
||||
// implementation (DRY). NOTE: event reads are otherwise GATED (C2) — the gRPC server long-polls
|
||||
// GetNext to the no-data terminal and row-level retrieval is not yet verified over gRPC (see class
|
||||
// remarks); the SEND seam is the spike's primary reuse signal. The split-channel opt-in
|
||||
// (HISTORIAN_GRPC_EVENT_SPLIT_CHANNEL) is preserved inside, unchanged.
|
||||
internal List<HistorianEvent> RunEventQueryOnSession(
|
||||
HistorianGrpcConnection connection,
|
||||
HistorianGrpcHandshake.Session session,
|
||||
DateTime startUtc,
|
||||
|
||||
@@ -67,12 +67,41 @@ internal sealed class HistorianGrpcEventWriteOrchestrator
|
||||
|
||||
// The event SEND uses the same v8 Event connection as the event READ. The write-enabled
|
||||
// open buffer is byte-identical to the read-only one (verified live), so OpenSession's
|
||||
// event path is reused unchanged.
|
||||
// event path is reused unchanged. Per-call: open + register + send on a fresh session.
|
||||
HistorianGrpcHandshake.Session session = OpenAndRegisterEventSession(connection, cancellationToken);
|
||||
|
||||
return SendEventOnSession(connection, session, evt, cancellationToken);
|
||||
}
|
||||
|
||||
// Spike seam (pending.md A1 broadening, Stage B0b): open a v8 Event connection and drive the
|
||||
// CM_EVENT registration ONCE, returning the warm (connection, session). The per-call Run() uses
|
||||
// it for a single send; the B0b reuse spike calls it once and then issues MULTIPLE
|
||||
// SendEventOnSession ops against the returned session to measure whether a v8 Event session can
|
||||
// be reused across sends (it has NEVER been proven reusable — that is exactly what B0b measures).
|
||||
// The caller owns the connection's lifetime (dispose it).
|
||||
internal HistorianGrpcHandshake.Session OpenAndRegisterEventSession(
|
||||
HistorianGrpcConnection connection,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(
|
||||
connection, _options, cancellationToken, eventConnection: true);
|
||||
|
||||
RegisterCmEventTag(connection, session, cancellationToken);
|
||||
return session;
|
||||
}
|
||||
|
||||
// Spike seam (pending.md A1 broadening, Stage B0b): perform ONLY the event send against an
|
||||
// EXTERNALLY-supplied, already-opened + CM_EVENT-registered v8 Event connection + session —
|
||||
// i.e. NO Create(), NO OpenSession(eventConnection:true), NO RegisterCmEventTag inside it. The
|
||||
// per-call Run() path delegates here so the per-call send and the B0b reuse-spike send share one
|
||||
// implementation (DRY) and stay byte-identical. The spike drives this repeatedly on one warm
|
||||
// session to measure whether the server honors a reused v8 Event session for multiple sends.
|
||||
internal bool SendEventOnSession(
|
||||
HistorianGrpcConnection connection,
|
||||
HistorianGrpcHandshake.Session session,
|
||||
HistorianEvent evt,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel);
|
||||
byte[] pBuf = HistorianEventWriteProtocol.SerializeAddStreamValuesBuffer(evt, DateTime.UtcNow);
|
||||
|
||||
|
||||
@@ -39,6 +39,27 @@ internal static class HistorianGrpcTagClient
|
||||
private static HistorianTagMetadata? GetTagMetadata(HistorianClientOptions options, string tag, CancellationToken cancellationToken)
|
||||
{
|
||||
byte[] tagInfos = GetTagInfosRaw(options, [tag], cancellationToken);
|
||||
return ParseTagMetadata(tagInfos);
|
||||
}
|
||||
|
||||
// Spike/Phase-1 seam (pending.md A1): resolve tag metadata against an EXTERNALLY-supplied,
|
||||
// already-authenticated connection + session — i.e. NO Create()/handshake here. The per-call
|
||||
// GetTagMetadata and this seam share the parse tail (ParseTagMetadata) so neither duplicates the
|
||||
// decode logic (DRY).
|
||||
internal static HistorianTagMetadata? GetTagMetadataOnSession(
|
||||
HistorianGrpcConnection connection,
|
||||
HistorianGrpcHandshake.Session session,
|
||||
string tag,
|
||||
HistorianClientOptions options,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
byte[] tagInfos = GetTagInfosRawOnSession(connection, session, [tag], options, cancellationToken);
|
||||
return ParseTagMetadata(tagInfos);
|
||||
}
|
||||
|
||||
// Shared parse tail for both the per-call GetTagMetadata and the reuse-path GetTagMetadataOnSession.
|
||||
private static HistorianTagMetadata? ParseTagMetadata(byte[] tagInfos)
|
||||
{
|
||||
if (tagInfos.Length < 4)
|
||||
{
|
||||
return null;
|
||||
@@ -69,7 +90,19 @@ internal static class HistorianGrpcTagClient
|
||||
{
|
||||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
|
||||
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, options, cancellationToken);
|
||||
return GetTagInfosRawOnSession(connection, session, tags, options, cancellationToken);
|
||||
}
|
||||
|
||||
// Spike/Phase-1 seam (pending.md A1): issue GetTagInfosFromName against an EXTERNALLY-supplied,
|
||||
// already-authenticated connection + session — i.e. NO Create()/handshake here. GetTagInfosRaw
|
||||
// delegates to this so the per-call path and the reuse path share one query implementation (DRY).
|
||||
internal static byte[] GetTagInfosRawOnSession(
|
||||
HistorianGrpcConnection connection,
|
||||
HistorianGrpcHandshake.Session session,
|
||||
IReadOnlyList<string> tags,
|
||||
HistorianClientOptions options,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(connection.Channel);
|
||||
byte[] requestBuffer = BuildTagNamesBuffer(tags);
|
||||
GrpcRetrieval.GetTagInfosFromNameResponse response = retrievalClient.GetTagInfosFromName(
|
||||
@@ -112,6 +145,8 @@ internal static class HistorianGrpcTagClient
|
||||
return Task.Run(() => GetTagExtendedProperties(options, tag, cancellationToken), cancellationToken);
|
||||
}
|
||||
|
||||
// No …OnSession seam: extended-properties browse stays per-call (not amortized through the session
|
||||
// pool — out of A1-broadening scope). Add a seam here only if the pool ever needs to route it.
|
||||
/// <summary>
|
||||
/// Issues a single page-0 <c>GetTagExtendedPropertiesFromName</c> call and returns the raw native
|
||||
/// <c>btTeps</c> response buffer (empty when the server reports no rows / non-success). Internal so
|
||||
@@ -222,6 +257,20 @@ internal static class HistorianGrpcTagClient
|
||||
{
|
||||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
|
||||
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, options, cancellationToken);
|
||||
return BrowseTagNamesOnSession(connection, session, filter, options, cancellationToken);
|
||||
}
|
||||
|
||||
// Spike/Phase-1 seam (pending.md A1): drive StartTagQuery → paged QueryTag → EndTagQuery against an
|
||||
// EXTERNALLY-supplied, already-authenticated connection + session — i.e. NO Create()/handshake here.
|
||||
// BrowseTagNames delegates to this so the per-call path and the reuse path share one browse
|
||||
// implementation (DRY).
|
||||
internal static List<string> BrowseTagNamesOnSession(
|
||||
HistorianGrpcConnection connection,
|
||||
HistorianGrpcHandshake.Session session,
|
||||
string filter,
|
||||
HistorianClientOptions options,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(connection.Channel);
|
||||
DateTime Deadline() => DateTime.UtcNow.Add(options.RequestTimeout);
|
||||
|
||||
|
||||
@@ -382,6 +382,38 @@ public sealed class HistorianClient : IAsyncDisposable
|
||||
}, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Opens a reusable v8 EVENT session (ECDH + RegisterCmEventTag ONCE) over the 2023 R2 gRPC
|
||||
/// transport. The caller owns the session and must dispose it. Reusing the session across sends
|
||||
/// amortizes the ECDH+register cost (~10-16×, spike-proven); the server idle-expires it in ~25s,
|
||||
/// so keep it warm (HistorianEventSession.PingAsync) or re-open. For SendEvent amortization only —
|
||||
/// event reads are gated (C2) and not exposed here. RemoteGrpc only.
|
||||
/// </summary>
|
||||
public async Task<HistorianEventSession> OpenEventSessionAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_options.Transport != HistorianTransport.RemoteGrpc)
|
||||
{
|
||||
throw new ProtocolEvidenceMissingException(
|
||||
"HistorianEventSession is only supported over the 2023 R2 RemoteGrpc transport.");
|
||||
}
|
||||
|
||||
return await Task.Run(() =>
|
||||
{
|
||||
Grpc.HistorianGrpcConnection connection = Grpc.HistorianGrpcChannelFactory.Create(_options);
|
||||
try
|
||||
{
|
||||
var orch = new Grpc.HistorianGrpcEventWriteOrchestrator(_options);
|
||||
Grpc.HistorianGrpcHandshake.Session session = orch.OpenAndRegisterEventSession(connection, cancellationToken);
|
||||
return new HistorianEventSession(connection, session, _options);
|
||||
}
|
||||
catch
|
||||
{
|
||||
connection.Dispose(); // don't leak the channel if the handshake fails
|
||||
throw;
|
||||
}
|
||||
}, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
using AVEVA.Historian.Client.Grpc;
|
||||
using AVEVA.Historian.Client.Models;
|
||||
|
||||
namespace AVEVA.Historian.Client;
|
||||
|
||||
/// <summary>A live, reusable authenticated v8 EVENT session: holds one event gRPC connection + one
|
||||
/// open+registered Event handle and runs SendEvent on it WITHOUT re-handshaking. Reuse amortizes the
|
||||
/// ECDH+register cost (~10-16×, spike-proven). SendEvent only — event READS are gated (C2) and stay
|
||||
/// per-call. Keep in sync with <see cref="HistorianSession"/> (the v6 sibling).</summary>
|
||||
public sealed class HistorianEventSession : IAsyncDisposable
|
||||
{
|
||||
private readonly HistorianGrpcConnection _connection;
|
||||
private readonly HistorianGrpcHandshake.Session _session;
|
||||
private readonly HistorianClientOptions _options;
|
||||
private int _disposed;
|
||||
|
||||
internal HistorianEventSession(
|
||||
HistorianGrpcConnection connection, HistorianGrpcHandshake.Session session, HistorianClientOptions options)
|
||||
{
|
||||
_connection = connection;
|
||||
_session = session;
|
||||
_options = options;
|
||||
}
|
||||
|
||||
/// <summary>Exposes the held event gRPC connection for internal callers (e.g. the round-trip test
|
||||
/// verifying the keepalive op directly). Not part of the public surface.</summary>
|
||||
internal HistorianGrpcConnection Connection => _connection;
|
||||
|
||||
/// <summary>Exposes the held open+registered Event session handle for internal callers (e.g. the
|
||||
/// round-trip test verifying the keepalive op directly). Not part of the public surface.</summary>
|
||||
internal HistorianGrpcHandshake.Session Session => _session;
|
||||
|
||||
/// <summary>Sends one event on the held (open+registered) v8 Event session.</summary>
|
||||
public Task<bool> SendEventAsync(HistorianEvent evt, CancellationToken ct = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(evt);
|
||||
if (evt.RevisionVersion != 0)
|
||||
{
|
||||
throw new ProtocolEvidenceMissingException(
|
||||
"Only original events (RevisionVersion = 0) have a captured send encoding; " +
|
||||
"revision/update/delete event sends are not yet supported.");
|
||||
}
|
||||
|
||||
var orch = new HistorianGrpcEventWriteOrchestrator(_options);
|
||||
return Task.Run(() => orch.SendEventOnSession(_connection, _session, evt, ct), ct);
|
||||
}
|
||||
|
||||
/// <summary>Keepalive via a lightweight <c>GetSystemParameter</c> status read on the event session's
|
||||
/// <see cref="HistorianGrpcHandshake.Session.ClientHandle"/> (the same status op the native pre-query
|
||||
/// sequence issues against an authenticated Event session), under the server idle floor. Mirrors
|
||||
/// <see cref="HistorianSession.PingAsync"/>. The op's effectiveness on a v8 Event handle is
|
||||
/// live-verified by the round-trip test.</summary>
|
||||
public Task PingAsync(CancellationToken ct = default)
|
||||
=> Task.Run(() => HistorianGrpcStatusClient.GetSystemParameterOnSession(
|
||||
_connection, _session.ClientHandle, _options, "HistorianVersion", ct), ct);
|
||||
|
||||
/// <summary>Disposes the underlying event connection (idempotent).</summary>
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
if (Interlocked.Exchange(ref _disposed, 1) == 0)
|
||||
{
|
||||
_connection.Dispose();
|
||||
}
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -7,7 +7,7 @@ namespace AVEVA.Historian.Client;
|
||||
/// <summary>A live, reusable authenticated Historian session: holds one gRPC connection + one
|
||||
/// OpenConnection handle and runs ops on them WITHOUT re-handshaking. Reuse across ops amortizes the
|
||||
/// auth handshake. Idle-expires server-side in ~20-25s — callers keep it warm (PingAsync) or re-open.
|
||||
/// Reads/historical-write/tag-write/status only; events are NOT exposed (separate channel+auth).</summary>
|
||||
/// Reads, browse/metadata, historical-write, tag-write and status; events are NOT exposed (separate channel+auth).</summary>
|
||||
public sealed class HistorianSession : IAsyncDisposable
|
||||
{
|
||||
private readonly HistorianGrpcConnection _connection;
|
||||
@@ -89,6 +89,28 @@ public sealed class HistorianSession : IAsyncDisposable
|
||||
() => orch.RunAtTimeOnSession(_connection, _session.ClientHandle, tag, timestampsUtc, ct), ct);
|
||||
}
|
||||
|
||||
// --- browse / metadata (call the …OnSession seams, which take the full Session for the string handle) ---
|
||||
|
||||
/// <summary>Browses tag names matching <paramref name="filter"/> on the held session.</summary>
|
||||
public async IAsyncEnumerable<string> BrowseTagNamesAsync(
|
||||
string filter = "*",
|
||||
[EnumeratorCancellation] CancellationToken ct = default)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
List<string> names = await Task.Run(
|
||||
() => HistorianGrpcTagClient.BrowseTagNamesOnSession(_connection, _session, filter, _options, ct), ct)
|
||||
.ConfigureAwait(false);
|
||||
foreach (string name in names)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
yield return name;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Reads metadata for <paramref name="tag"/> on the held session (null if unknown).</summary>
|
||||
public Task<HistorianTagMetadata?> GetTagMetadataAsync(string tag, CancellationToken ct = default)
|
||||
=> Task.Run(() => HistorianGrpcTagClient.GetTagMetadataOnSession(_connection, _session, tag, _options, ct), ct);
|
||||
|
||||
// --- writes (the …OnSession seams take the full Session, since the historical write keys on the
|
||||
// string handle + tag GUID and the tag-config ops mix string/uint handles) ---
|
||||
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
using System.Reflection;
|
||||
using AVEVA.Historian.Client.Grpc;
|
||||
using Xunit;
|
||||
|
||||
namespace AVEVA.Historian.Client.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Reflection guard for the event-on-session seams the B0b reuse spike drives (pending.md A1
|
||||
/// broadening, Stage B0). Mirrors <see cref="TagClientOnSessionSeamTests"/>: the seam runs ONLY the
|
||||
/// op against an externally-supplied (connection, session), so the spike can run MULTIPLE event ops
|
||||
/// on one already-opened + registered v8 Event session to measure reuse.
|
||||
/// </summary>
|
||||
public class EventOnSessionSeamTests
|
||||
{
|
||||
private static MethodInfo RequireMethod(Type owner, string name)
|
||||
{
|
||||
MethodInfo? m = owner.GetMethod(
|
||||
name, BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.Public | BindingFlags.Static);
|
||||
Assert.NotNull(m);
|
||||
return m!;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SendEventOnSession_ExposesSeam_WithConnectionAndSessionFirst()
|
||||
{
|
||||
MethodInfo m = RequireMethod(typeof(HistorianGrpcEventWriteOrchestrator), "SendEventOnSession");
|
||||
ParameterInfo[] ps = m.GetParameters();
|
||||
Assert.Equal("HistorianGrpcConnection", ps[0].ParameterType.Name);
|
||||
Assert.Equal("Session", ps[1].ParameterType.Name);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void OpenAndRegisterEventSession_ExposesRegisterOnceSeam()
|
||||
{
|
||||
// The spike registers CM_EVENT ONCE via this helper, then issues many SendEventOnSession ops.
|
||||
MethodInfo m = RequireMethod(typeof(HistorianGrpcEventWriteOrchestrator), "OpenAndRegisterEventSession");
|
||||
ParameterInfo[] ps = m.GetParameters();
|
||||
Assert.Equal("HistorianGrpcConnection", ps[0].ParameterType.Name);
|
||||
Assert.Equal("CancellationToken", ps[1].ParameterType.Name);
|
||||
Assert.Equal("Session", m.ReturnType.Name);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RunEventQueryOnSession_ExposesSeam_WithConnectionAndSessionFirst()
|
||||
{
|
||||
MethodInfo m = RequireMethod(typeof(HistorianGrpcEventOrchestrator), "RunEventQueryOnSession");
|
||||
ParameterInfo[] ps = m.GetParameters();
|
||||
Assert.Equal("HistorianGrpcConnection", ps[0].ParameterType.Name);
|
||||
Assert.Equal("Session", ps[1].ParameterType.Name);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,255 @@
|
||||
using System.Diagnostics;
|
||||
using Grpc.Core;
|
||||
using AVEVA.Historian.Client.Grpc;
|
||||
using AVEVA.Historian.Client.Models;
|
||||
using Xunit;
|
||||
using Xunit.Abstractions;
|
||||
|
||||
namespace AVEVA.Historian.Client.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// SPIKE (pending.md A1 broadening, Stage B0b): can ONE v8 Event session be REUSED across many event
|
||||
/// ops without re-handshaking — the precondition for broadening handshake amortization to the event
|
||||
/// path? Env-gated exactly like <see cref="HandshakeReuseSpikeTests"/> (silent early-return skip
|
||||
/// without HISTORIAN_GRPC_HOST + HISTORIAN_USER + HISTORIAN_PASSWORD + HISTORIAN_EVENT_SANDBOX_TAG).
|
||||
///
|
||||
/// This is the B0b HARNESS only — it is RUN LIVE by a human over VPN in B0c. It SKIPS cleanly offline
|
||||
/// (no historian contacted, no event sent). It drives the B0a internal seams directly:
|
||||
/// <see cref="HistorianGrpcEventWriteOrchestrator.OpenAndRegisterEventSession"/> (open v8 Event session
|
||||
/// + RegisterCmEventTag ONCE) and <see cref="HistorianGrpcEventWriteOrchestrator.SendEventOnSession"/>
|
||||
/// (send-only, on the externally-supplied warm session).
|
||||
///
|
||||
/// Spike questions (priority order), mapped to the test methods below:
|
||||
/// (1) Does a v8 Event session survive REUSE? — <see cref="ReusedEventSession_SendsTwice_SecondSkipsHandshake"/>
|
||||
/// (PRIMARY GREEN/RED signal: two sends on one session both succeed; the 2nd skips ECDH+register).
|
||||
/// (2) Does REGISTER-ONCE work? — <see cref="ReusedEventSession_RegisterOnce_ThenSendMany"/>
|
||||
/// (OpenAndRegister once, then SendEventOnSession N× — no per-send re-registration).
|
||||
/// (3) ONE-KIND best-effort — <see cref="ReusedEventSession_ServesReadAfterSend_BestEffort"/>
|
||||
/// (can the same session also serve a ReadEvents after a send? LOGGED, never asserted — reads are gated C2).
|
||||
/// (4) IDLE expiry best-effort — <see cref="ReusedEventSession_IdleSweep_BestEffort"/>
|
||||
/// (how long can the session sit idle before a send breaks? LOGGED, never asserted).
|
||||
///
|
||||
/// SAFETY: every send targets the env var HISTORIAN_EVENT_SANDBOX_TAG ONLY (carried as the event
|
||||
/// SourceName/Type so the appended events are unambiguously attributable to the sandbox identity, never
|
||||
/// a production tag). Success is ASSERTED for (1)/(2); latency is LOGGED only (no flaky perf gates).
|
||||
/// </summary>
|
||||
public sealed class EventSessionReuseSpikeTests
|
||||
{
|
||||
private const int SendMany = 3;
|
||||
private readonly ITestOutputHelper _output;
|
||||
|
||||
public EventSessionReuseSpikeTests(ITestOutputHelper output) => _output = output;
|
||||
|
||||
// (1) REUSE VALIDITY — PRIMARY signal. Open+register ONE v8 Event session, then SendEventOnSession
|
||||
// TWICE on it with NO re-handshake/re-register between sends. If the server rejects reusing a v8
|
||||
// Event session, send #2 fails (false / throws) -> RED finding. Both succeed -> GREEN (event-session
|
||||
// reuse is sound, the precondition for event amortization). Latency LOGGED so B0c sees the win
|
||||
// (open+register cost vs the two reused sends).
|
||||
[Fact]
|
||||
public void ReusedEventSession_SendsTwice_SecondSkipsHandshake()
|
||||
{
|
||||
if (!TryGetEnv(out string host, out string sandboxTag)) return;
|
||||
HistorianClientOptions options = BuildOptions(host);
|
||||
var orchestrator = new HistorianGrpcEventWriteOrchestrator(options);
|
||||
|
||||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
|
||||
|
||||
var swOpen = Stopwatch.StartNew();
|
||||
HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None);
|
||||
swOpen.Stop();
|
||||
_output.WriteLine($"open+register (ECDH handshake + RegisterCmEventTag) = {swOpen.ElapsedMilliseconds} ms");
|
||||
_output.WriteLine($"registration diag: {orchestrator.RegistrationDiag}");
|
||||
|
||||
for (int i = 0; i < 2; i++)
|
||||
{
|
||||
HistorianEvent evt = BuildSandboxEvent(sandboxTag, attempt: i);
|
||||
var sw = Stopwatch.StartNew();
|
||||
bool ok = orchestrator.SendEventOnSession(connection, session, evt, CancellationToken.None);
|
||||
sw.Stop();
|
||||
_output.WriteLine($"reused-send[{i}] = {sw.ElapsedMilliseconds} ms, ok={ok}, lastErr='{orchestrator.LastSendErrorDescription}'");
|
||||
Assert.True(ok, $"reused v8 Event session send[{i}] should be accepted (AddStreamValues BSuccess).");
|
||||
}
|
||||
}
|
||||
|
||||
// (2) REGISTER-ONCE. Open+register ONCE, then SendEventOnSession N× — proving RegisterCmEventTag does
|
||||
// NOT need re-running per send (the seam's whole point). All sends must succeed.
|
||||
[Fact]
|
||||
public void ReusedEventSession_RegisterOnce_ThenSendMany()
|
||||
{
|
||||
if (!TryGetEnv(out string host, out string sandboxTag)) return;
|
||||
HistorianClientOptions options = BuildOptions(host);
|
||||
var orchestrator = new HistorianGrpcEventWriteOrchestrator(options);
|
||||
|
||||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
|
||||
HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None);
|
||||
|
||||
for (int i = 0; i < SendMany; i++)
|
||||
{
|
||||
HistorianEvent evt = BuildSandboxEvent(sandboxTag, attempt: i);
|
||||
var sw = Stopwatch.StartNew();
|
||||
bool ok = orchestrator.SendEventOnSession(connection, session, evt, CancellationToken.None);
|
||||
sw.Stop();
|
||||
_output.WriteLine($"register-once send[{i}] = {sw.ElapsedMilliseconds} ms, ok={ok}");
|
||||
Assert.True(ok, $"register-once send[{i}] should be accepted without per-send re-registration.");
|
||||
}
|
||||
}
|
||||
|
||||
// (3) ONE-KIND PROBE (best-effort). After a send on the warm session, try a ReadEvents on the SAME
|
||||
// session. Event reads are GATED (C2 — the gRPC server long-polls GetNext to the no-data terminal and
|
||||
// row-level retrieval is not verified over gRPC), so the outcome (rows or exception) is LOGGED, never
|
||||
// asserted: the test passes as long as the catch swallows any failure. Records the one-kind finding
|
||||
// (can one Event session serve both send and read?) for B0c.
|
||||
[Fact]
|
||||
public void ReusedEventSession_ServesReadAfterSend_BestEffort()
|
||||
{
|
||||
if (!TryGetEnv(out string host, out string sandboxTag)) return;
|
||||
HistorianClientOptions options = BuildOptions(host);
|
||||
var writeOrch = new HistorianGrpcEventWriteOrchestrator(options);
|
||||
|
||||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
|
||||
HistorianGrpcHandshake.Session session = writeOrch.OpenAndRegisterEventSession(connection, CancellationToken.None);
|
||||
|
||||
bool sent = writeOrch.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 0), CancellationToken.None);
|
||||
_output.WriteLine($"seed-send before read-probe ok={sent}");
|
||||
|
||||
// HARD bound so this probe CANNOT hang on the known C2 event-read long-poll (GetNext blocks to the
|
||||
// no-data terminal on an idle box). Two independent fuses: (1) a read-only options copy with a 5s
|
||||
// RequestTimeout so each underlying GetNext RPC deadlines quickly (the read orchestrator caps its
|
||||
// poll deadline at min(10s, RequestTimeout)); (2) an 8s CancellationToken passed as ct so the chain
|
||||
// is cancelled even if a per-RPC deadline is not honored over a tunnel. Whichever fires first, the
|
||||
// method returns in ~10s max even when the read never returns data.
|
||||
HistorianClientOptions readOptions = BuildOptions(host, requestTimeoutOverride: TimeSpan.FromSeconds(5));
|
||||
var readOrch = new HistorianGrpcEventOrchestrator(readOptions);
|
||||
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(8));
|
||||
|
||||
(DateTime startUtc, DateTime endUtc) = LastSevenDays();
|
||||
try
|
||||
{
|
||||
List<HistorianEvent> rows = readOrch.RunEventQueryOnSession(
|
||||
connection, session, startUtc, endUtc, filter: null, readCts.Token);
|
||||
_output.WriteLine($"read-after-send -> OK (rows={rows.Count}) => ONE-KIND (an Event session serves send AND read)");
|
||||
}
|
||||
catch (Exception ex) when (ex is OperationCanceledException
|
||||
|| (ex is RpcException rpc && rpc.StatusCode == StatusCode.DeadlineExceeded))
|
||||
{
|
||||
// EXPECTED outcome: the read hit its bound (CTS timeout or per-RPC deadline) without returning —
|
||||
// consistent with the C2 event-read gating (GetNext long-polls to the no-data terminal). This is
|
||||
// the recorded one-kind finding, NOT a failure.
|
||||
_output.WriteLine($"read-after-send did not return within the bound (consistent with C2 event-read gating): {ex.GetType().Name}");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Any other rejection on the reused session is also the finding, not a failure.
|
||||
_output.WriteLine($"read-after-send -> swallowed ({ex.GetType().Name}: {ex.Message}) => read gated/unverified over gRPC (expected)");
|
||||
}
|
||||
// No assertion: this method's job is to RECORD the one-kind outcome for B0c, not gate on it.
|
||||
}
|
||||
|
||||
// (4) IDLE-EXPIRY SWEEP (best-effort, log-only). Send, sit idle for a gap, send again; LOG whether the
|
||||
// 2nd send broke (and after how long). Bounds how long a warm Event session may sit idle before the
|
||||
// server expires it — informs the keepalive cadence for an event-session pool. Default gap 25s;
|
||||
// override via HISTORIAN_EVENT_IDLE_SECONDS. NEVER asserted (a break is the finding, not a failure).
|
||||
[Fact]
|
||||
[Trait("Category", "LiveSpike")]
|
||||
public void ReusedEventSession_IdleSweep_BestEffort()
|
||||
{
|
||||
if (!TryGetEnv(out string host, out string sandboxTag)) return;
|
||||
HistorianClientOptions options = BuildOptions(host);
|
||||
var orchestrator = new HistorianGrpcEventWriteOrchestrator(options);
|
||||
|
||||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
|
||||
HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None);
|
||||
|
||||
bool first = orchestrator.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 0), CancellationToken.None);
|
||||
_output.WriteLine($"idle-sweep first send ok={first}");
|
||||
|
||||
int idleSec = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_EVENT_IDLE_SECONDS"), out int parsed) && parsed > 0
|
||||
? parsed
|
||||
: 25;
|
||||
_output.WriteLine($"idle-sweep: sleeping {idleSec}s before the second send...");
|
||||
Thread.Sleep(TimeSpan.FromSeconds(idleSec));
|
||||
|
||||
try
|
||||
{
|
||||
bool second = orchestrator.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 1), CancellationToken.None);
|
||||
_output.WriteLine($"idle {idleSec}s -> second send ok={second} (session {(second ? "SURVIVED" : "rejected")} the idle gap)");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_output.WriteLine($"idle {idleSec}s -> second send BROKE ({ex.GetType().Name}: {ex.Message}) — session expired while idle");
|
||||
}
|
||||
// No assertion: idle-expiry timing is a LOGGED finding for the keepalive cadence, not a gate.
|
||||
}
|
||||
|
||||
// --- helpers ---
|
||||
|
||||
// Build a send event that targets the sandbox identity ONLY. The CM_EVENT send buffer carries no
|
||||
// per-tag routing field (it registers against the CM_EVENT system tag), so we stamp the sandbox tag
|
||||
// NAME into SourceName + Type and a marker Property so the appended event is unambiguously
|
||||
// attributable to the sandbox — never a production tag. A fresh Id/timestamps per attempt.
|
||||
private static HistorianEvent BuildSandboxEvent(string sandboxTag, int attempt)
|
||||
{
|
||||
DateTime now = DateTime.UtcNow;
|
||||
return new HistorianEvent(
|
||||
Id: Guid.NewGuid(),
|
||||
EventTimeUtc: now.AddSeconds(-attempt),
|
||||
ReceivedTimeUtc: now,
|
||||
Type: sandboxTag,
|
||||
SourceName: sandboxTag,
|
||||
Namespace: "HistGW.EventReuseSpike",
|
||||
RevisionVersion: 0,
|
||||
Properties: new Dictionary<string, object?>
|
||||
{
|
||||
["SpikeAttempt"] = attempt.ToString(System.Globalization.CultureInfo.InvariantCulture),
|
||||
["SpikeMarker"] = "B0b-event-session-reuse",
|
||||
});
|
||||
}
|
||||
|
||||
private static bool TryGetEnv(out string host, out string sandboxTag)
|
||||
{
|
||||
host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? "";
|
||||
sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_EVENT_SANDBOX_TAG") ?? "";
|
||||
return !string.IsNullOrWhiteSpace(host)
|
||||
&& !string.IsNullOrWhiteSpace(sandboxTag)
|
||||
&& !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER"))
|
||||
&& !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD"));
|
||||
}
|
||||
|
||||
private static (DateTime StartUtc, DateTime EndUtc) LastSevenDays()
|
||||
{
|
||||
DateTime end = DateTime.UtcNow;
|
||||
return (end - TimeSpan.FromDays(7), end);
|
||||
}
|
||||
|
||||
// requestTimeoutOverride: when set, forces RequestTimeout (used by the read-after-send probe to give
|
||||
// each GetNext RPC a short deadline). null preserves the env-driven default for the send/idle methods.
|
||||
private static HistorianClientOptions BuildOptions(string host, TimeSpan? requestTimeoutOverride = null)
|
||||
{
|
||||
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
|
||||
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
|
||||
bool explicitCreds = !string.IsNullOrEmpty(user);
|
||||
int port = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_PORT"), out int parsed)
|
||||
? parsed
|
||||
: HistorianClientOptions.DefaultGrpcPort;
|
||||
bool tls = string.Equals(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TLS"), "true", StringComparison.OrdinalIgnoreCase);
|
||||
TimeSpan timeout = requestTimeoutOverride
|
||||
?? (int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TIMEOUT"), out int secs) && secs > 0
|
||||
? TimeSpan.FromSeconds(secs)
|
||||
: new HistorianClientOptions { Host = host }.RequestTimeout);
|
||||
|
||||
return new HistorianClientOptions
|
||||
{
|
||||
Host = host,
|
||||
Port = port,
|
||||
Transport = HistorianTransport.RemoteGrpc,
|
||||
GrpcUseTls = tls,
|
||||
AllowUntrustedServerCertificate = tls,
|
||||
ServerDnsIdentity = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_DNSID"),
|
||||
IntegratedSecurity = !explicitCreds,
|
||||
UserName = user ?? string.Empty,
|
||||
Password = password ?? string.Empty,
|
||||
RequestTimeout = timeout,
|
||||
Compression = true
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
using AVEVA.Historian.Client.Grpc;
|
||||
using AVEVA.Historian.Client.Models;
|
||||
using Xunit.Abstractions;
|
||||
|
||||
namespace AVEVA.Historian.Client.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Live end-to-end round-trip for <see cref="HistorianEventSession"/> (the v8 EVENT sibling of
|
||||
/// <see cref="HistorianSessionRoundTripTests"/>): open ONE reusable event session, SendEvent on it
|
||||
/// TWICE (no re-handshake/re-register between sends), ping once, dispose. Env-gated exactly like
|
||||
/// <see cref="EventSessionReuseSpikeTests"/> (silent early-return skip without HISTORIAN_GRPC_HOST +
|
||||
/// HISTORIAN_USER + HISTORIAN_PASSWORD + HISTORIAN_EVENT_SANDBOX_TAG). Every send targets the sandbox
|
||||
/// identity ONLY (carried as the event SourceName/Type), never a production tag.
|
||||
/// </summary>
|
||||
public sealed class HistorianEventSessionRoundTripTests
|
||||
{
|
||||
private readonly ITestOutputHelper _output;
|
||||
|
||||
public HistorianEventSessionRoundTripTests(ITestOutputHelper output) => _output = output;
|
||||
|
||||
[Fact]
|
||||
public async Task EventSession_SendTwicePing_AllOnOneSession()
|
||||
{
|
||||
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
||||
string? sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_EVENT_SANDBOX_TAG");
|
||||
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(sandboxTag)
|
||||
|| string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER"))
|
||||
|| string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD")))
|
||||
{
|
||||
return; // skip — env not configured
|
||||
}
|
||||
|
||||
HistorianClientOptions options = BuildOptions(host);
|
||||
await using var client = new HistorianClient(options);
|
||||
await using HistorianEventSession session = await client.OpenEventSessionAsync(CancellationToken.None);
|
||||
|
||||
// 1) send TWICE on the SAME (open + CM_EVENT-registered) session — the 2nd skips ECDH+register.
|
||||
for (int i = 0; i < 2; i++)
|
||||
{
|
||||
bool sent = await session.SendEventAsync(BuildSandboxEvent(sandboxTag, attempt: i), CancellationToken.None);
|
||||
_output.WriteLine($"{i + 1}) reused-send[{i}] -> ok={sent}");
|
||||
Assert.True(sent, $"reused v8 Event session send[{i}] should be accepted (AddStreamValues BSuccess).");
|
||||
}
|
||||
|
||||
// 2) ping on the SAME session — must not throw.
|
||||
await session.PingAsync(CancellationToken.None);
|
||||
_output.WriteLine("3) ping -> ok");
|
||||
|
||||
// 3) prove the keepalive op actually RETURNS DATA on the v8 Event handle (not just no-throw):
|
||||
// issue the same underlying GetSystemParameter the ping uses, directly against the event
|
||||
// session's connection + ClientHandle, and assert it yields a non-empty value.
|
||||
string? keepalive = HistorianGrpcStatusClient.GetSystemParameterOnSession(
|
||||
session.Connection, session.Session.ClientHandle, options, "HistorianVersion", CancellationToken.None);
|
||||
Assert.False(string.IsNullOrEmpty(keepalive));
|
||||
_output.WriteLine($"4) keepalive GetSystemParameter on event handle -> '{keepalive}'");
|
||||
|
||||
_output.WriteLine("event-session round-trip OK (two sends + ping + verified keepalive on one session)");
|
||||
}
|
||||
|
||||
// Build a send event that targets the sandbox identity ONLY (mirrors EventSessionReuseSpikeTests.
|
||||
// BuildSandboxEvent): the CM_EVENT send buffer carries no per-tag routing field, so the sandbox tag
|
||||
// NAME is stamped into SourceName + Type + a marker Property so the appended event is unambiguously
|
||||
// attributable to the sandbox — never a production tag. A fresh Id/timestamps per attempt.
|
||||
private static HistorianEvent BuildSandboxEvent(string sandboxTag, int attempt)
|
||||
{
|
||||
DateTime now = DateTime.UtcNow;
|
||||
return new HistorianEvent(
|
||||
Id: Guid.NewGuid(),
|
||||
EventTimeUtc: now.AddSeconds(-attempt),
|
||||
ReceivedTimeUtc: now,
|
||||
Type: sandboxTag,
|
||||
SourceName: sandboxTag,
|
||||
Namespace: "HistGW.EventSessionRoundTrip",
|
||||
RevisionVersion: 0,
|
||||
Properties: new Dictionary<string, object?>
|
||||
{
|
||||
["RoundTripAttempt"] = attempt.ToString(System.Globalization.CultureInfo.InvariantCulture),
|
||||
["RoundTripMarker"] = "B1-event-session-roundtrip",
|
||||
});
|
||||
}
|
||||
|
||||
// verbatim copy of BuildOptions from HistorianSessionRoundTripTests / EventSessionReuseSpikeTests
|
||||
private static HistorianClientOptions BuildOptions(string host)
|
||||
{
|
||||
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
|
||||
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
|
||||
bool explicitCreds = !string.IsNullOrEmpty(user);
|
||||
int port = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_PORT"), out int parsed)
|
||||
? parsed
|
||||
: HistorianClientOptions.DefaultGrpcPort;
|
||||
bool tls = string.Equals(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TLS"), "true", StringComparison.OrdinalIgnoreCase);
|
||||
TimeSpan timeout = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TIMEOUT"), out int secs) && secs > 0
|
||||
? TimeSpan.FromSeconds(secs)
|
||||
: new HistorianClientOptions { Host = host }.RequestTimeout;
|
||||
|
||||
return new HistorianClientOptions
|
||||
{
|
||||
Host = host,
|
||||
Port = port,
|
||||
Transport = HistorianTransport.RemoteGrpc,
|
||||
GrpcUseTls = tls,
|
||||
AllowUntrustedServerCertificate = tls,
|
||||
ServerDnsIdentity = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_DNSID"),
|
||||
IntegratedSecurity = !explicitCreds,
|
||||
UserName = user ?? string.Empty,
|
||||
Password = password ?? string.Empty,
|
||||
RequestTimeout = timeout,
|
||||
Compression = true
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -53,7 +53,21 @@ public sealed class HistorianSessionRoundTripTests
|
||||
await session.PingAsync(CancellationToken.None); // must not throw
|
||||
_output.WriteLine("4) ping -> ok");
|
||||
|
||||
_output.WriteLine("session round-trip OK (write+read+status+ping on one session)");
|
||||
// 5) metadata + browse on the SAME session (no re-handshake)
|
||||
HistorianTagMetadata? meta = await session.GetTagMetadataAsync(sandboxTag, CancellationToken.None);
|
||||
Assert.NotNull(meta);
|
||||
_output.WriteLine("5) metadata -> ok");
|
||||
|
||||
List<string> browsed = [];
|
||||
await foreach (string n in session.BrowseTagNamesAsync("*", CancellationToken.None))
|
||||
{
|
||||
browsed.Add(n);
|
||||
if (browsed.Count >= 5) break;
|
||||
}
|
||||
Assert.NotEmpty(browsed);
|
||||
_output.WriteLine($"6) browse rows={browsed.Count}");
|
||||
|
||||
_output.WriteLine("session round-trip OK (write+read+status+ping+metadata+browse on one session)");
|
||||
}
|
||||
|
||||
// verbatim copy of BuildOptions from HandshakeReuseSpikeTests
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
using System.Reflection;
|
||||
using AVEVA.Historian.Client.Grpc;
|
||||
using Xunit;
|
||||
|
||||
namespace AVEVA.Historian.Client.Tests;
|
||||
|
||||
public class TagClientOnSessionSeamTests
|
||||
{
|
||||
[Theory]
|
||||
[InlineData("BrowseTagNamesOnSession")]
|
||||
[InlineData("GetTagInfosRawOnSession")]
|
||||
[InlineData("GetTagMetadataOnSession")]
|
||||
public void TagClient_ExposesOnSessionSeam(string name)
|
||||
{
|
||||
MethodInfo? m = typeof(HistorianGrpcTagClient).GetMethod(
|
||||
name, BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Public);
|
||||
Assert.NotNull(m);
|
||||
ParameterInfo[] ps = m!.GetParameters();
|
||||
Assert.Equal("HistorianGrpcConnection", ps[0].ParameterType.Name);
|
||||
Assert.Equal("Session", ps[1].ParameterType.Name);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user