test(gateway): deterministic multi-subscriber test sync + cap-rejection specificity

Replace Task.Delay(100) subscriber-attachment races with WaitForSubscriberCountAsync,
a polling gate on GatewaySession.ActiveEventSubscriberCount so Advise and event fan-out
cannot proceed until all subscribers are confirmed registered.

Fix WaitForMessageCountAsync to honor a single CancellationTokenSource deadline across
the poll loop rather than resetting the timeout on each intermediate wakeup.

Add ordering comment in the cancellation test explaining why stream1Task must be awaited
before AllowNextEvent to guarantee sub1 is unregistered before the 2nd event is fanned.

Assert capException.Status.Detail contains "maximum" in the cap test to distinguish
EventSubscriberLimitReached (AllowMultiple=true cap) from EventSubscriberAlreadyActive
(single-subscriber rejection) — both map to ResourceExhausted.

Extract shared ConfigureCommandReply helper and move FakeWorkerProcess to TestSupport/
so both fake-worker test classes reference one definition.
This commit is contained in:
Joseph Doherty
2026-06-15 16:34:12 -04:00
parent 9dd97a27f1
commit 056bb39a4d
4 changed files with 144 additions and 126 deletions
@@ -533,63 +533,4 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
}
}
private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
/// <summary>
/// Gets the process identifier.
/// </summary>
public int Id { get; } = processId;
/// <summary>
/// Gets a value indicating whether the process has exited.
/// </summary>
public bool HasExited { get; private set; }
/// <summary>
/// Gets the exit code of the process.
/// </summary>
public int? ExitCode { get; private set; }
/// <summary>
/// Waits for the process to exit asynchronously. Completes only when <see cref="Kill"/>
/// or <see cref="MarkExited"/> has been called, so callers that observe completion can
/// trust that exit actually happened (e.g., via the worker shutdown-ack path).
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A task that completes when the process has actually exited.</returns>
public ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
return new ValueTask(_exited.Task.WaitAsync(cancellationToken));
}
/// <summary>
/// Terminates the process.
/// </summary>
/// <param name="entireProcessTree">Whether to kill the entire process tree.</param>
public void Kill(bool entireProcessTree)
{
MarkExited(-1);
}
/// <summary>
/// Releases resources used by this process.
/// </summary>
public void Dispose()
{
}
/// <summary>
/// Marks the process as exited with the specified exit code.
/// </summary>
/// <param name="exitCode">The process exit code.</param>
public void MarkExited(int exitCode)
{
HasExited = true;
ExitCode = exitCode;
_exited.TrySetResult();
}
}
}
@@ -70,8 +70,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests
writer2,
new TestServerCallContext()));
// Give the stream tasks a moment to attach so they are subscribed before Advise.
await Task.Delay(100);
// Wait until both subscribers are registered before issuing the Advise that
// triggers events. Polling ActiveEventSubscriberCount on the live GatewaySession
// is deterministic: we can't race past this point until the production code has
// actually incremented the counter.
await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout);
MxCommandReply registerReply = await fixture.Service.Invoke(
CreateRegisterRequest(sessionId),
@@ -162,8 +165,8 @@ public sealed class GatewayEndToEndMultiSubscriberTests
writer2,
new TestServerCallContext()));
// Give both streams a moment to attach.
await Task.Delay(100);
// Wait until both subscribers are registered before issuing Advise.
await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout);
// Wire up the session: Register + AddItem + Advise.
MxCommandReply registerReply = await fixture.Service.Invoke(
@@ -190,6 +193,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests
await sub1Cts.CancelAsync();
try
{
// Awaiting stream1Task here is load-bearing: it ensures EventStreamService's
// finally block runs subscriber.Dispose() → sub1 is fully unregistered before
// AllowNextEvent() below fans the 2nd event. Reordering these two lines would
// let the 2nd event reach sub1 before it is removed, breaking the assertion
// that sub1 received exactly one event.
await stream1Task.WaitAsync(TestTimeout);
}
catch (OperationCanceledException)
@@ -238,6 +246,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests
{
const int cap = 2;
GatedEventFakeWorkerProcessLauncher launcher = new();
// AllowMultipleEventSubscribers=true is required: the cap rejection path
// (EventSubscriberLimitReached) is only reachable when multi-subscriber mode is on.
// Without it the service hits EventSubscriberAlreadyActive instead — also
// ResourceExhausted, but for the wrong reason — masking a misconfiguration.
await using MultiSubscriberGatewayServiceFixture fixture = new(launcher, maxEventSubscribersPerSession: cap);
OpenSessionReply openReply = await fixture.Service.OpenSession(
@@ -268,8 +281,8 @@ public sealed class GatewayEndToEndMultiSubscriberTests
writer2,
new TestServerCallContext()));
// Give both streams time to attach.
await Task.Delay(100);
// Wait until both subscribers are registered before emitting or adding a third.
await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout);
// Wire up the session so the worker is ready.
MxCommandReply registerReply = await fixture.Service.Invoke(
@@ -302,6 +315,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests
Assert.Equal(StatusCode.ResourceExhausted, capException.StatusCode);
// Confirm this is the cap rejection, not the single-subscriber rejection
// (EventSubscriberAlreadyActive). The production message for
// EventSubscriberLimitReached contains "maximum".
Assert.Contains("maximum", capException.Status.Detail, StringComparison.OrdinalIgnoreCase);
// The first two streams must still be live (not completed).
Assert.False(stream1Task.IsCompleted, "Sub1 must still be streaming after sub3 was rejected.");
Assert.False(stream2Task.IsCompleted, "Sub2 must still be streaming after sub3 was rejected.");
@@ -363,6 +381,25 @@ public sealed class GatewayEndToEndMultiSubscriberTests
},
};
// ---- shared fake worker helpers ----
/// <summary>
/// Populates <paramref name="reply"/> with the scripted response for <paramref name="kind"/>.
/// Shared by both fake worker launchers so the reply logic is not duplicated.
/// </summary>
private static void ConfigureCommandReply(MxCommandReply reply, MxCommandKind kind)
{
switch (kind)
{
case MxCommandKind.Register:
reply.Register = new RegisterReply { ServerHandle = ServerHandle };
break;
case MxCommandKind.AddItem:
reply.AddItem = new AddItemReply { ItemHandle = ItemHandle };
break;
}
}
// ---- fixture ----
/// <summary>
@@ -411,6 +448,43 @@ public sealed class GatewayEndToEndMultiSubscriberTests
public MxAccessGatewayService Service { get; }
/// <summary>
/// Polls <see cref="GatewaySession.ActiveEventSubscriberCount"/> for the session
/// identified by <paramref name="sessionId"/> until it reaches <paramref name="n"/>,
/// bounded by <paramref name="timeout"/>. Fails the test if the count is not reached
/// within the deadline.
/// </summary>
/// <remarks>
/// This is the deterministic gate that replaces <c>Task.Delay</c> before Advise
/// calls: it proves the production code has registered each subscriber before we
/// fan any events.
/// </remarks>
public async Task WaitForSubscriberCountAsync(string sessionId, int n, TimeSpan timeout)
{
using CancellationTokenSource deadlineCts = new(timeout);
while (true)
{
if (_registry.TryGet(sessionId, out GatewaySession? session)
&& session.ActiveEventSubscriberCount >= n)
{
return;
}
if (deadlineCts.IsCancellationRequested)
{
int actual = _registry.TryGet(sessionId, out GatewaySession? s)
? s.ActiveEventSubscriberCount
: -1;
Assert.Fail(
$"Timed out waiting for {n} event subscriber(s) on session {sessionId}. "
+ $"Actual count after {timeout.TotalSeconds:0.#}s: {actual}.");
}
await Task.Delay(millisecondsDelay: 5, deadlineCts.Token).ConfigureAwait(false);
}
}
public async ValueTask DisposeAsync()
{
foreach (GatewaySession session in _registry.Snapshot())
@@ -505,7 +579,7 @@ public sealed class GatewayEndToEndMultiSubscriberTests
await harness.ReplyToCommandAsync(
envelope,
configureReply: reply => ConfigureReply(reply, command.Kind),
configureReply: reply => ConfigureCommandReply(reply, command.Kind),
cancellationToken: cancellationToken).ConfigureAwait(false);
// After Advise emit all events immediately.
@@ -533,19 +607,6 @@ public sealed class GatewayEndToEndMultiSubscriberTests
}
}
}
private static void ConfigureReply(MxCommandReply reply, MxCommandKind kind)
{
switch (kind)
{
case MxCommandKind.Register:
reply.Register = new RegisterReply { ServerHandle = ServerHandle };
break;
case MxCommandKind.AddItem:
reply.AddItem = new AddItemReply { ItemHandle = ItemHandle };
break;
}
}
}
/// <summary>
@@ -668,7 +729,7 @@ public sealed class GatewayEndToEndMultiSubscriberTests
MxCommand command = envelope.WorkerCommand.Command;
await harness.ReplyToCommandAsync(
envelope,
configureReply: reply => ConfigureReply(reply, command.Kind),
configureReply: reply => ConfigureCommandReply(reply, command.Kind),
cancellationToken: cancellationToken).ConfigureAwait(false);
if (command.Kind == MxCommandKind.Advise)
@@ -678,43 +739,5 @@ public sealed class GatewayEndToEndMultiSubscriberTests
}
}
}
private static void ConfigureReply(MxCommandReply reply, MxCommandKind kind)
{
switch (kind)
{
case MxCommandKind.Register:
reply.Register = new RegisterReply { ServerHandle = ServerHandle };
break;
case MxCommandKind.AddItem:
reply.AddItem = new AddItemReply { ItemHandle = ItemHandle };
break;
}
}
}
private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
public int Id { get; } = processId;
public bool HasExited { get; private set; }
public int? ExitCode { get; private set; }
public ValueTask WaitForExitAsync(CancellationToken cancellationToken) =>
new(_exited.Task.WaitAsync(cancellationToken));
public void Kill(bool entireProcessTree) => MarkExited(-1);
public void Dispose()
{
}
public void MarkExited(int exitCode)
{
HasExited = true;
ExitCode = exitCode;
_exited.TrySetResult();
}
}
}
@@ -0,0 +1,47 @@
using ZB.MOM.WW.MxGateway.Server.Workers;
namespace ZB.MOM.WW.MxGateway.Tests.TestSupport;
/// <summary>
/// Lightweight in-process stand-in for <see cref="IWorkerProcess"/> used by fake worker
/// launchers in end-to-end tests. Call <see cref="MarkExited"/> from the fake worker
/// body once the shutdown-ack handshake is complete so that callers awaiting
/// <see cref="WaitForExitAsync"/> observe real exit rather than a timeout.
/// </summary>
public sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
/// <summary>Gets the process identifier.</summary>
public int Id { get; } = processId;
/// <summary>Gets a value indicating whether the process has exited.</summary>
public bool HasExited { get; private set; }
/// <summary>Gets the exit code of the process, or <see langword="null"/> if it has not exited.</summary>
public int? ExitCode { get; private set; }
/// <inheritdoc />
public ValueTask WaitForExitAsync(CancellationToken cancellationToken) =>
new(_exited.Task.WaitAsync(cancellationToken));
/// <inheritdoc />
public void Kill(bool entireProcessTree) => MarkExited(-1);
/// <inheritdoc />
public void Dispose()
{
}
/// <summary>
/// Marks the process as exited with the specified exit code and unblocks
/// any callers of <see cref="WaitForExitAsync"/>.
/// </summary>
/// <param name="exitCode">The process exit code.</param>
public void MarkExited(int exitCode)
{
HasExited = true;
ExitCode = exitCode;
_exited.TrySetResult();
}
}
@@ -70,16 +70,23 @@ public sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
/// <summary>
/// Waits until at least <paramref name="count"/> messages have been written, then returns
/// the current snapshot. The wait is bounded by <paramref name="timeout"/>; if fewer than
/// <paramref name="count"/> messages arrive within the timeout the call throws
/// <see cref="TimeoutException"/> (surfaced as <see cref="OperationCanceledException"/>
/// from <see cref="Task.WaitAsync(TimeSpan)"/>).
/// the current snapshot. The wait is bounded by a single deadline of <c>now + timeout</c>;
/// intermediate wakeups (when a message arrives but the count is not yet met) consume from
/// that same deadline so the total elapsed time never exceeds <paramref name="timeout"/>.
/// If fewer than <paramref name="count"/> messages arrive before the deadline the call
/// throws <see cref="OperationCanceledException"/>.
/// </summary>
/// <param name="count">Minimum number of messages to wait for.</param>
/// <param name="timeout">Maximum time to wait.</param>
/// <param name="timeout">Maximum total time to wait, measured from the moment of the call.</param>
/// <returns>A snapshot of all messages received so far (at least <paramref name="count"/>).</returns>
public async Task<IReadOnlyList<T>> WaitForMessageCountAsync(int count, TimeSpan timeout)
{
// Capture a single deadline so every iteration of the loop below draws from the
// same budget — using WaitAsync(timeout) per iteration would reset the clock on
// each intermediate wakeup, effectively giving N×timeout total budget.
using CancellationTokenSource deadlineCts = new(timeout);
CancellationToken deadlineToken = deadlineCts.Token;
TaskCompletionSource<IReadOnlyList<T>>? tcs = null;
lock (_syncRoot)
@@ -93,17 +100,17 @@ public sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
_countWaiters.Add(tcs);
}
// Poll: re-check each time any message arrives. The TCS is satisfied on EVERY write,
// Re-check each time any message arrives. The TCS is satisfied on every write,
// but the caller may need more messages, so we loop until the count is met.
while (true)
{
IReadOnlyList<T> snapshot = await tcs.Task.WaitAsync(timeout).ConfigureAwait(false);
IReadOnlyList<T> snapshot = await tcs.Task.WaitAsync(deadlineToken).ConfigureAwait(false);
if (snapshot.Count >= count)
{
return snapshot;
}
// Not enough yet — register a new waiter and keep waiting.
// Not enough yet — register a new waiter and keep waiting against the same deadline.
lock (_syncRoot)
{
if (_messages.Count >= count)