Files
histsdk/tests/AVEVA.Historian.Client.Tests/EventSessionReuseSpikeTests.cs
T
Joseph Doherty 81da404c5d test(spike): bound event read-after-send so the spike can't hang on C2 long-poll
The Q3 read-after-send probe (ReusedEventSession_ServesReadAfterSend_BestEffort)
long-polled GetNext to the no-data terminal with no tight bound and ran past the
5-min suite timeout on the live run. Bound it two ways: a read-only options copy
with a 5s RequestTimeout (so each GetNext RPC deadlines fast) and an 8s
CancellationToken passed as ct. Either fuse returns the method in ~10s; the
timeout/cancellation is logged as the expected C2-gated outcome (still no assert).
Other three spike methods unchanged.

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
2026-06-25 11:32:09 -04:00

256 lines
14 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Diagnostics;
using Grpc.Core;
using AVEVA.Historian.Client.Grpc;
using AVEVA.Historian.Client.Models;
using Xunit;
using Xunit.Abstractions;
namespace AVEVA.Historian.Client.Tests;
/// <summary>
/// SPIKE (pending.md A1 broadening, Stage B0b): can ONE v8 Event session be REUSED across many event
/// ops without re-handshaking — the precondition for broadening handshake amortization to the event
/// path? Env-gated exactly like <see cref="HandshakeReuseSpikeTests"/> (silent early-return skip
/// without HISTORIAN_GRPC_HOST + HISTORIAN_USER + HISTORIAN_PASSWORD + HISTORIAN_EVENT_SANDBOX_TAG).
///
/// This is the B0b HARNESS only — it is RUN LIVE by a human over VPN in B0c. It SKIPS cleanly offline
/// (no historian contacted, no event sent). It drives the B0a internal seams directly:
/// <see cref="HistorianGrpcEventWriteOrchestrator.OpenAndRegisterEventSession"/> (open v8 Event session
/// + RegisterCmEventTag ONCE) and <see cref="HistorianGrpcEventWriteOrchestrator.SendEventOnSession"/>
/// (send-only, on the externally-supplied warm session).
///
/// Spike questions (priority order), mapped to the test methods below:
/// (1) Does a v8 Event session survive REUSE? — <see cref="ReusedEventSession_SendsTwice_SecondSkipsHandshake"/>
/// (PRIMARY GREEN/RED signal: two sends on one session both succeed; the 2nd skips ECDH+register).
/// (2) Does REGISTER-ONCE work? — <see cref="ReusedEventSession_RegisterOnce_ThenSendMany"/>
/// (OpenAndRegister once, then SendEventOnSession N× — no per-send re-registration).
/// (3) ONE-KIND best-effort — <see cref="ReusedEventSession_ServesReadAfterSend_BestEffort"/>
/// (can the same session also serve a ReadEvents after a send? LOGGED, never asserted — reads are gated C2).
/// (4) IDLE expiry best-effort — <see cref="ReusedEventSession_IdleSweep_BestEffort"/>
/// (how long can the session sit idle before a send breaks? LOGGED, never asserted).
///
/// SAFETY: every send targets the env var HISTORIAN_EVENT_SANDBOX_TAG ONLY (carried as the event
/// SourceName/Type so the appended events are unambiguously attributable to the sandbox identity, never
/// a production tag). Success is ASSERTED for (1)/(2); latency is LOGGED only (no flaky perf gates).
/// </summary>
public sealed class EventSessionReuseSpikeTests
{
private const int SendMany = 3;
private readonly ITestOutputHelper _output;
public EventSessionReuseSpikeTests(ITestOutputHelper output) => _output = output;
// (1) REUSE VALIDITY — PRIMARY signal. Open+register ONE v8 Event session, then SendEventOnSession
// TWICE on it with NO re-handshake/re-register between sends. If the server rejects reusing a v8
// Event session, send #2 fails (false / throws) -> RED finding. Both succeed -> GREEN (event-session
// reuse is sound, the precondition for event amortization). Latency LOGGED so B0c sees the win
// (open+register cost vs the two reused sends).
[Fact]
public void ReusedEventSession_SendsTwice_SecondSkipsHandshake()
{
if (!TryGetEnv(out string host, out string sandboxTag)) return;
HistorianClientOptions options = BuildOptions(host);
var orchestrator = new HistorianGrpcEventWriteOrchestrator(options);
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
var swOpen = Stopwatch.StartNew();
HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None);
swOpen.Stop();
_output.WriteLine($"open+register (ECDH handshake + RegisterCmEventTag) = {swOpen.ElapsedMilliseconds} ms");
_output.WriteLine($"registration diag: {orchestrator.RegistrationDiag}");
for (int i = 0; i < 2; i++)
{
HistorianEvent evt = BuildSandboxEvent(sandboxTag, attempt: i);
var sw = Stopwatch.StartNew();
bool ok = orchestrator.SendEventOnSession(connection, session, evt, CancellationToken.None);
sw.Stop();
_output.WriteLine($"reused-send[{i}] = {sw.ElapsedMilliseconds} ms, ok={ok}, lastErr='{orchestrator.LastSendErrorDescription}'");
Assert.True(ok, $"reused v8 Event session send[{i}] should be accepted (AddStreamValues BSuccess).");
}
}
// (2) REGISTER-ONCE. Open+register ONCE, then SendEventOnSession N× — proving RegisterCmEventTag does
// NOT need re-running per send (the seam's whole point). All sends must succeed.
[Fact]
public void ReusedEventSession_RegisterOnce_ThenSendMany()
{
if (!TryGetEnv(out string host, out string sandboxTag)) return;
HistorianClientOptions options = BuildOptions(host);
var orchestrator = new HistorianGrpcEventWriteOrchestrator(options);
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None);
for (int i = 0; i < SendMany; i++)
{
HistorianEvent evt = BuildSandboxEvent(sandboxTag, attempt: i);
var sw = Stopwatch.StartNew();
bool ok = orchestrator.SendEventOnSession(connection, session, evt, CancellationToken.None);
sw.Stop();
_output.WriteLine($"register-once send[{i}] = {sw.ElapsedMilliseconds} ms, ok={ok}");
Assert.True(ok, $"register-once send[{i}] should be accepted without per-send re-registration.");
}
}
// (3) ONE-KIND PROBE (best-effort). After a send on the warm session, try a ReadEvents on the SAME
// session. Event reads are GATED (C2 — the gRPC server long-polls GetNext to the no-data terminal and
// row-level retrieval is not verified over gRPC), so the outcome (rows or exception) is LOGGED, never
// asserted: the test passes as long as the catch swallows any failure. Records the one-kind finding
// (can one Event session serve both send and read?) for B0c.
[Fact]
public void ReusedEventSession_ServesReadAfterSend_BestEffort()
{
if (!TryGetEnv(out string host, out string sandboxTag)) return;
HistorianClientOptions options = BuildOptions(host);
var writeOrch = new HistorianGrpcEventWriteOrchestrator(options);
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
HistorianGrpcHandshake.Session session = writeOrch.OpenAndRegisterEventSession(connection, CancellationToken.None);
bool sent = writeOrch.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 0), CancellationToken.None);
_output.WriteLine($"seed-send before read-probe ok={sent}");
// HARD bound so this probe CANNOT hang on the known C2 event-read long-poll (GetNext blocks to the
// no-data terminal on an idle box). Two independent fuses: (1) a read-only options copy with a 5s
// RequestTimeout so each underlying GetNext RPC deadlines quickly (the read orchestrator caps its
// poll deadline at min(10s, RequestTimeout)); (2) an 8s CancellationToken passed as ct so the chain
// is cancelled even if a per-RPC deadline is not honored over a tunnel. Whichever fires first, the
// method returns in ~10s max even when the read never returns data.
HistorianClientOptions readOptions = BuildOptions(host, requestTimeoutOverride: TimeSpan.FromSeconds(5));
var readOrch = new HistorianGrpcEventOrchestrator(readOptions);
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(8));
(DateTime startUtc, DateTime endUtc) = LastSevenDays();
try
{
List<HistorianEvent> rows = readOrch.RunEventQueryOnSession(
connection, session, startUtc, endUtc, filter: null, readCts.Token);
_output.WriteLine($"read-after-send -> OK (rows={rows.Count}) => ONE-KIND (an Event session serves send AND read)");
}
catch (Exception ex) when (ex is OperationCanceledException
|| (ex is RpcException rpc && rpc.StatusCode == StatusCode.DeadlineExceeded))
{
// EXPECTED outcome: the read hit its bound (CTS timeout or per-RPC deadline) without returning —
// consistent with the C2 event-read gating (GetNext long-polls to the no-data terminal). This is
// the recorded one-kind finding, NOT a failure.
_output.WriteLine($"read-after-send did not return within the bound (consistent with C2 event-read gating): {ex.GetType().Name}");
}
catch (Exception ex)
{
// Any other rejection on the reused session is also the finding, not a failure.
_output.WriteLine($"read-after-send -> swallowed ({ex.GetType().Name}: {ex.Message}) => read gated/unverified over gRPC (expected)");
}
// No assertion: this method's job is to RECORD the one-kind outcome for B0c, not gate on it.
}
// (4) IDLE-EXPIRY SWEEP (best-effort, log-only). Send, sit idle for a gap, send again; LOG whether the
// 2nd send broke (and after how long). Bounds how long a warm Event session may sit idle before the
// server expires it — informs the keepalive cadence for an event-session pool. Default gap 25s;
// override via HISTORIAN_EVENT_IDLE_SECONDS. NEVER asserted (a break is the finding, not a failure).
[Fact]
[Trait("Category", "LiveSpike")]
public void ReusedEventSession_IdleSweep_BestEffort()
{
if (!TryGetEnv(out string host, out string sandboxTag)) return;
HistorianClientOptions options = BuildOptions(host);
var orchestrator = new HistorianGrpcEventWriteOrchestrator(options);
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None);
bool first = orchestrator.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 0), CancellationToken.None);
_output.WriteLine($"idle-sweep first send ok={first}");
int idleSec = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_EVENT_IDLE_SECONDS"), out int parsed) && parsed > 0
? parsed
: 25;
_output.WriteLine($"idle-sweep: sleeping {idleSec}s before the second send...");
Thread.Sleep(TimeSpan.FromSeconds(idleSec));
try
{
bool second = orchestrator.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 1), CancellationToken.None);
_output.WriteLine($"idle {idleSec}s -> second send ok={second} (session {(second ? "SURVIVED" : "rejected")} the idle gap)");
}
catch (Exception ex)
{
_output.WriteLine($"idle {idleSec}s -> second send BROKE ({ex.GetType().Name}: {ex.Message}) — session expired while idle");
}
// No assertion: idle-expiry timing is a LOGGED finding for the keepalive cadence, not a gate.
}
// --- helpers ---
// Build a send event that targets the sandbox identity ONLY. The CM_EVENT send buffer carries no
// per-tag routing field (it registers against the CM_EVENT system tag), so we stamp the sandbox tag
// NAME into SourceName + Type and a marker Property so the appended event is unambiguously
// attributable to the sandbox — never a production tag. A fresh Id/timestamps per attempt.
private static HistorianEvent BuildSandboxEvent(string sandboxTag, int attempt)
{
DateTime now = DateTime.UtcNow;
return new HistorianEvent(
Id: Guid.NewGuid(),
EventTimeUtc: now.AddSeconds(-attempt),
ReceivedTimeUtc: now,
Type: sandboxTag,
SourceName: sandboxTag,
Namespace: "HistGW.EventReuseSpike",
RevisionVersion: 0,
Properties: new Dictionary<string, object?>
{
["SpikeAttempt"] = attempt.ToString(System.Globalization.CultureInfo.InvariantCulture),
["SpikeMarker"] = "B0b-event-session-reuse",
});
}
private static bool TryGetEnv(out string host, out string sandboxTag)
{
host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? "";
sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_EVENT_SANDBOX_TAG") ?? "";
return !string.IsNullOrWhiteSpace(host)
&& !string.IsNullOrWhiteSpace(sandboxTag)
&& !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER"))
&& !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD"));
}
private static (DateTime StartUtc, DateTime EndUtc) LastSevenDays()
{
DateTime end = DateTime.UtcNow;
return (end - TimeSpan.FromDays(7), end);
}
// requestTimeoutOverride: when set, forces RequestTimeout (used by the read-after-send probe to give
// each GetNext RPC a short deadline). null preserves the env-driven default for the send/idle methods.
private static HistorianClientOptions BuildOptions(string host, TimeSpan? requestTimeoutOverride = null)
{
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
bool explicitCreds = !string.IsNullOrEmpty(user);
int port = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_PORT"), out int parsed)
? parsed
: HistorianClientOptions.DefaultGrpcPort;
bool tls = string.Equals(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TLS"), "true", StringComparison.OrdinalIgnoreCase);
TimeSpan timeout = requestTimeoutOverride
?? (int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TIMEOUT"), out int secs) && secs > 0
? TimeSpan.FromSeconds(secs)
: new HistorianClientOptions { Host = host }.RequestTimeout);
return new HistorianClientOptions
{
Host = host,
Port = port,
Transport = HistorianTransport.RemoteGrpc,
GrpcUseTls = tls,
AllowUntrustedServerCertificate = tls,
ServerDnsIdentity = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_DNSID"),
IntegratedSecurity = !explicitCreds,
UserName = user ?? string.Empty,
Password = password ?? string.Empty,
RequestTimeout = timeout,
Compression = true
};
}
}