81da404c5d
The Q3 read-after-send probe (ReusedEventSession_ServesReadAfterSend_BestEffort) long-polled GetNext to the no-data terminal with no tight bound and ran past the 5-min suite timeout on the live run. Bound it two ways: a read-only options copy with a 5s RequestTimeout (so each GetNext RPC deadlines fast) and an 8s CancellationToken passed as ct. Either fuse returns the method in ~10s; the timeout/cancellation is logged as the expected C2-gated outcome (still no assert). Other three spike methods unchanged. Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
256 lines
14 KiB
C#
256 lines
14 KiB
C#
using System.Diagnostics;
|
||
using Grpc.Core;
|
||
using AVEVA.Historian.Client.Grpc;
|
||
using AVEVA.Historian.Client.Models;
|
||
using Xunit;
|
||
using Xunit.Abstractions;
|
||
|
||
namespace AVEVA.Historian.Client.Tests;
|
||
|
||
/// <summary>
|
||
/// SPIKE (pending.md A1 broadening, Stage B0b): can ONE v8 Event session be REUSED across many event
|
||
/// ops without re-handshaking — the precondition for broadening handshake amortization to the event
|
||
/// path? Env-gated exactly like <see cref="HandshakeReuseSpikeTests"/> (silent early-return skip
|
||
/// without HISTORIAN_GRPC_HOST + HISTORIAN_USER + HISTORIAN_PASSWORD + HISTORIAN_EVENT_SANDBOX_TAG).
|
||
///
|
||
/// This is the B0b HARNESS only — it is RUN LIVE by a human over VPN in B0c. It SKIPS cleanly offline
|
||
/// (no historian contacted, no event sent). It drives the B0a internal seams directly:
|
||
/// <see cref="HistorianGrpcEventWriteOrchestrator.OpenAndRegisterEventSession"/> (open v8 Event session
|
||
/// + RegisterCmEventTag ONCE) and <see cref="HistorianGrpcEventWriteOrchestrator.SendEventOnSession"/>
|
||
/// (send-only, on the externally-supplied warm session).
|
||
///
|
||
/// Spike questions (priority order), mapped to the test methods below:
|
||
/// (1) Does a v8 Event session survive REUSE? — <see cref="ReusedEventSession_SendsTwice_SecondSkipsHandshake"/>
|
||
/// (PRIMARY GREEN/RED signal: two sends on one session both succeed; the 2nd skips ECDH+register).
|
||
/// (2) Does REGISTER-ONCE work? — <see cref="ReusedEventSession_RegisterOnce_ThenSendMany"/>
|
||
/// (OpenAndRegister once, then SendEventOnSession N× — no per-send re-registration).
|
||
/// (3) ONE-KIND best-effort — <see cref="ReusedEventSession_ServesReadAfterSend_BestEffort"/>
|
||
/// (can the same session also serve a ReadEvents after a send? LOGGED, never asserted — reads are gated C2).
|
||
/// (4) IDLE expiry best-effort — <see cref="ReusedEventSession_IdleSweep_BestEffort"/>
|
||
/// (how long can the session sit idle before a send breaks? LOGGED, never asserted).
|
||
///
|
||
/// SAFETY: every send targets the env var HISTORIAN_EVENT_SANDBOX_TAG ONLY (carried as the event
|
||
/// SourceName/Type so the appended events are unambiguously attributable to the sandbox identity, never
|
||
/// a production tag). Success is ASSERTED for (1)/(2); latency is LOGGED only (no flaky perf gates).
|
||
/// </summary>
|
||
public sealed class EventSessionReuseSpikeTests
|
||
{
|
||
private const int SendMany = 3;
|
||
private readonly ITestOutputHelper _output;
|
||
|
||
public EventSessionReuseSpikeTests(ITestOutputHelper output) => _output = output;
|
||
|
||
// (1) REUSE VALIDITY — PRIMARY signal. Open+register ONE v8 Event session, then SendEventOnSession
|
||
// TWICE on it with NO re-handshake/re-register between sends. If the server rejects reusing a v8
|
||
// Event session, send #2 fails (false / throws) -> RED finding. Both succeed -> GREEN (event-session
|
||
// reuse is sound, the precondition for event amortization). Latency LOGGED so B0c sees the win
|
||
// (open+register cost vs the two reused sends).
|
||
[Fact]
|
||
public void ReusedEventSession_SendsTwice_SecondSkipsHandshake()
|
||
{
|
||
if (!TryGetEnv(out string host, out string sandboxTag)) return;
|
||
HistorianClientOptions options = BuildOptions(host);
|
||
var orchestrator = new HistorianGrpcEventWriteOrchestrator(options);
|
||
|
||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
|
||
|
||
var swOpen = Stopwatch.StartNew();
|
||
HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None);
|
||
swOpen.Stop();
|
||
_output.WriteLine($"open+register (ECDH handshake + RegisterCmEventTag) = {swOpen.ElapsedMilliseconds} ms");
|
||
_output.WriteLine($"registration diag: {orchestrator.RegistrationDiag}");
|
||
|
||
for (int i = 0; i < 2; i++)
|
||
{
|
||
HistorianEvent evt = BuildSandboxEvent(sandboxTag, attempt: i);
|
||
var sw = Stopwatch.StartNew();
|
||
bool ok = orchestrator.SendEventOnSession(connection, session, evt, CancellationToken.None);
|
||
sw.Stop();
|
||
_output.WriteLine($"reused-send[{i}] = {sw.ElapsedMilliseconds} ms, ok={ok}, lastErr='{orchestrator.LastSendErrorDescription}'");
|
||
Assert.True(ok, $"reused v8 Event session send[{i}] should be accepted (AddStreamValues BSuccess).");
|
||
}
|
||
}
|
||
|
||
// (2) REGISTER-ONCE. Open+register ONCE, then SendEventOnSession N× — proving RegisterCmEventTag does
|
||
// NOT need re-running per send (the seam's whole point). All sends must succeed.
|
||
[Fact]
|
||
public void ReusedEventSession_RegisterOnce_ThenSendMany()
|
||
{
|
||
if (!TryGetEnv(out string host, out string sandboxTag)) return;
|
||
HistorianClientOptions options = BuildOptions(host);
|
||
var orchestrator = new HistorianGrpcEventWriteOrchestrator(options);
|
||
|
||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
|
||
HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None);
|
||
|
||
for (int i = 0; i < SendMany; i++)
|
||
{
|
||
HistorianEvent evt = BuildSandboxEvent(sandboxTag, attempt: i);
|
||
var sw = Stopwatch.StartNew();
|
||
bool ok = orchestrator.SendEventOnSession(connection, session, evt, CancellationToken.None);
|
||
sw.Stop();
|
||
_output.WriteLine($"register-once send[{i}] = {sw.ElapsedMilliseconds} ms, ok={ok}");
|
||
Assert.True(ok, $"register-once send[{i}] should be accepted without per-send re-registration.");
|
||
}
|
||
}
|
||
|
||
// (3) ONE-KIND PROBE (best-effort). After a send on the warm session, try a ReadEvents on the SAME
|
||
// session. Event reads are GATED (C2 — the gRPC server long-polls GetNext to the no-data terminal and
|
||
// row-level retrieval is not verified over gRPC), so the outcome (rows or exception) is LOGGED, never
|
||
// asserted: the test passes as long as the catch swallows any failure. Records the one-kind finding
|
||
// (can one Event session serve both send and read?) for B0c.
|
||
[Fact]
|
||
public void ReusedEventSession_ServesReadAfterSend_BestEffort()
|
||
{
|
||
if (!TryGetEnv(out string host, out string sandboxTag)) return;
|
||
HistorianClientOptions options = BuildOptions(host);
|
||
var writeOrch = new HistorianGrpcEventWriteOrchestrator(options);
|
||
|
||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
|
||
HistorianGrpcHandshake.Session session = writeOrch.OpenAndRegisterEventSession(connection, CancellationToken.None);
|
||
|
||
bool sent = writeOrch.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 0), CancellationToken.None);
|
||
_output.WriteLine($"seed-send before read-probe ok={sent}");
|
||
|
||
// HARD bound so this probe CANNOT hang on the known C2 event-read long-poll (GetNext blocks to the
|
||
// no-data terminal on an idle box). Two independent fuses: (1) a read-only options copy with a 5s
|
||
// RequestTimeout so each underlying GetNext RPC deadlines quickly (the read orchestrator caps its
|
||
// poll deadline at min(10s, RequestTimeout)); (2) an 8s CancellationToken passed as ct so the chain
|
||
// is cancelled even if a per-RPC deadline is not honored over a tunnel. Whichever fires first, the
|
||
// method returns in ~10s max even when the read never returns data.
|
||
HistorianClientOptions readOptions = BuildOptions(host, requestTimeoutOverride: TimeSpan.FromSeconds(5));
|
||
var readOrch = new HistorianGrpcEventOrchestrator(readOptions);
|
||
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(8));
|
||
|
||
(DateTime startUtc, DateTime endUtc) = LastSevenDays();
|
||
try
|
||
{
|
||
List<HistorianEvent> rows = readOrch.RunEventQueryOnSession(
|
||
connection, session, startUtc, endUtc, filter: null, readCts.Token);
|
||
_output.WriteLine($"read-after-send -> OK (rows={rows.Count}) => ONE-KIND (an Event session serves send AND read)");
|
||
}
|
||
catch (Exception ex) when (ex is OperationCanceledException
|
||
|| (ex is RpcException rpc && rpc.StatusCode == StatusCode.DeadlineExceeded))
|
||
{
|
||
// EXPECTED outcome: the read hit its bound (CTS timeout or per-RPC deadline) without returning —
|
||
// consistent with the C2 event-read gating (GetNext long-polls to the no-data terminal). This is
|
||
// the recorded one-kind finding, NOT a failure.
|
||
_output.WriteLine($"read-after-send did not return within the bound (consistent with C2 event-read gating): {ex.GetType().Name}");
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
// Any other rejection on the reused session is also the finding, not a failure.
|
||
_output.WriteLine($"read-after-send -> swallowed ({ex.GetType().Name}: {ex.Message}) => read gated/unverified over gRPC (expected)");
|
||
}
|
||
// No assertion: this method's job is to RECORD the one-kind outcome for B0c, not gate on it.
|
||
}
|
||
|
||
// (4) IDLE-EXPIRY SWEEP (best-effort, log-only). Send, sit idle for a gap, send again; LOG whether the
|
||
// 2nd send broke (and after how long). Bounds how long a warm Event session may sit idle before the
|
||
// server expires it — informs the keepalive cadence for an event-session pool. Default gap 25s;
|
||
// override via HISTORIAN_EVENT_IDLE_SECONDS. NEVER asserted (a break is the finding, not a failure).
|
||
[Fact]
|
||
[Trait("Category", "LiveSpike")]
|
||
public void ReusedEventSession_IdleSweep_BestEffort()
|
||
{
|
||
if (!TryGetEnv(out string host, out string sandboxTag)) return;
|
||
HistorianClientOptions options = BuildOptions(host);
|
||
var orchestrator = new HistorianGrpcEventWriteOrchestrator(options);
|
||
|
||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
|
||
HistorianGrpcHandshake.Session session = orchestrator.OpenAndRegisterEventSession(connection, CancellationToken.None);
|
||
|
||
bool first = orchestrator.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 0), CancellationToken.None);
|
||
_output.WriteLine($"idle-sweep first send ok={first}");
|
||
|
||
int idleSec = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_EVENT_IDLE_SECONDS"), out int parsed) && parsed > 0
|
||
? parsed
|
||
: 25;
|
||
_output.WriteLine($"idle-sweep: sleeping {idleSec}s before the second send...");
|
||
Thread.Sleep(TimeSpan.FromSeconds(idleSec));
|
||
|
||
try
|
||
{
|
||
bool second = orchestrator.SendEventOnSession(connection, session, BuildSandboxEvent(sandboxTag, attempt: 1), CancellationToken.None);
|
||
_output.WriteLine($"idle {idleSec}s -> second send ok={second} (session {(second ? "SURVIVED" : "rejected")} the idle gap)");
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_output.WriteLine($"idle {idleSec}s -> second send BROKE ({ex.GetType().Name}: {ex.Message}) — session expired while idle");
|
||
}
|
||
// No assertion: idle-expiry timing is a LOGGED finding for the keepalive cadence, not a gate.
|
||
}
|
||
|
||
// --- helpers ---
|
||
|
||
// Build a send event that targets the sandbox identity ONLY. The CM_EVENT send buffer carries no
|
||
// per-tag routing field (it registers against the CM_EVENT system tag), so we stamp the sandbox tag
|
||
// NAME into SourceName + Type and a marker Property so the appended event is unambiguously
|
||
// attributable to the sandbox — never a production tag. A fresh Id/timestamps per attempt.
|
||
private static HistorianEvent BuildSandboxEvent(string sandboxTag, int attempt)
|
||
{
|
||
DateTime now = DateTime.UtcNow;
|
||
return new HistorianEvent(
|
||
Id: Guid.NewGuid(),
|
||
EventTimeUtc: now.AddSeconds(-attempt),
|
||
ReceivedTimeUtc: now,
|
||
Type: sandboxTag,
|
||
SourceName: sandboxTag,
|
||
Namespace: "HistGW.EventReuseSpike",
|
||
RevisionVersion: 0,
|
||
Properties: new Dictionary<string, object?>
|
||
{
|
||
["SpikeAttempt"] = attempt.ToString(System.Globalization.CultureInfo.InvariantCulture),
|
||
["SpikeMarker"] = "B0b-event-session-reuse",
|
||
});
|
||
}
|
||
|
||
private static bool TryGetEnv(out string host, out string sandboxTag)
|
||
{
|
||
host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST") ?? "";
|
||
sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_EVENT_SANDBOX_TAG") ?? "";
|
||
return !string.IsNullOrWhiteSpace(host)
|
||
&& !string.IsNullOrWhiteSpace(sandboxTag)
|
||
&& !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER"))
|
||
&& !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD"));
|
||
}
|
||
|
||
private static (DateTime StartUtc, DateTime EndUtc) LastSevenDays()
|
||
{
|
||
DateTime end = DateTime.UtcNow;
|
||
return (end - TimeSpan.FromDays(7), end);
|
||
}
|
||
|
||
// requestTimeoutOverride: when set, forces RequestTimeout (used by the read-after-send probe to give
|
||
// each GetNext RPC a short deadline). null preserves the env-driven default for the send/idle methods.
|
||
private static HistorianClientOptions BuildOptions(string host, TimeSpan? requestTimeoutOverride = null)
|
||
{
|
||
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
|
||
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
|
||
bool explicitCreds = !string.IsNullOrEmpty(user);
|
||
int port = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_PORT"), out int parsed)
|
||
? parsed
|
||
: HistorianClientOptions.DefaultGrpcPort;
|
||
bool tls = string.Equals(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TLS"), "true", StringComparison.OrdinalIgnoreCase);
|
||
TimeSpan timeout = requestTimeoutOverride
|
||
?? (int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TIMEOUT"), out int secs) && secs > 0
|
||
? TimeSpan.FromSeconds(secs)
|
||
: new HistorianClientOptions { Host = host }.RequestTimeout);
|
||
|
||
return new HistorianClientOptions
|
||
{
|
||
Host = host,
|
||
Port = port,
|
||
Transport = HistorianTransport.RemoteGrpc,
|
||
GrpcUseTls = tls,
|
||
AllowUntrustedServerCertificate = tls,
|
||
ServerDnsIdentity = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_DNSID"),
|
||
IntegratedSecurity = !explicitCreds,
|
||
UserName = user ?? string.Empty,
|
||
Password = password ?? string.Empty,
|
||
RequestTimeout = timeout,
|
||
Compression = true
|
||
};
|
||
}
|
||
}
|