From 056bb39a4d965aa8e0736abfa3fa32408c6a0129 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 16:34:12 -0400 Subject: [PATCH] test(gateway): deterministic multi-subscriber test sync + cap-rejection specificity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace Task.Delay(100) subscriber-attachment races with WaitForSubscriberCountAsync, a polling gate on GatewaySession.ActiveEventSubscriberCount so Advise and event fan-out cannot proceed until all subscribers are confirmed registered. Fix WaitForMessageCountAsync to honor a single CancellationTokenSource deadline across the poll loop rather than resetting the timeout on each intermediate wakeup. Add ordering comment in the cancellation test explaining why stream1Task must be awaited before AllowNextEvent to guarantee sub1 is unregistered before the 2nd event is fanned. Assert capException.Status.Detail contains "maximum" in the cap test to distinguish EventSubscriberLimitReached (AllowMultiple=true cap) from EventSubscriberAlreadyActive (single-subscriber rejection) — both map to ResourceExhausted. Extract shared ConfigureCommandReply helper and move FakeWorkerProcess to TestSupport/ so both fake-worker test classes reference one definition. --- .../GatewayEndToEndFakeWorkerSmokeTests.cs | 59 -------- .../GatewayEndToEndMultiSubscriberTests.cs | 141 ++++++++++-------- .../TestSupport/FakeWorkerProcess.cs | 47 ++++++ .../RecordingServerStreamWriter.cs | 23 ++- 4 files changed, 144 insertions(+), 126 deletions(-) create mode 100644 src/ZB.MOM.WW.MxGateway.Tests/TestSupport/FakeWorkerProcess.cs diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs index d4da987..39ddae7 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs @@ -533,63 +533,4 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests } } - private sealed class FakeWorkerProcess(int processId) : IWorkerProcess - { - private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously); - - /// - /// Gets the process identifier. - /// - public int Id { get; } = processId; - - /// - /// Gets a value indicating whether the process has exited. - /// - public bool HasExited { get; private set; } - - /// - /// Gets the exit code of the process. - /// - public int? ExitCode { get; private set; } - - /// - /// Waits for the process to exit asynchronously. Completes only when - /// or has been called, so callers that observe completion can - /// trust that exit actually happened (e.g., via the worker shutdown-ack path). - /// - /// Cancellation token. - /// A task that completes when the process has actually exited. - public ValueTask WaitForExitAsync(CancellationToken cancellationToken) - { - return new ValueTask(_exited.Task.WaitAsync(cancellationToken)); - } - - /// - /// Terminates the process. - /// - /// Whether to kill the entire process tree. - public void Kill(bool entireProcessTree) - { - MarkExited(-1); - } - - /// - /// Releases resources used by this process. - /// - public void Dispose() - { - } - - /// - /// Marks the process as exited with the specified exit code. - /// - /// The process exit code. - public void MarkExited(int exitCode) - { - HasExited = true; - ExitCode = exitCode; - _exited.TrySetResult(); - } - } - } diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndMultiSubscriberTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndMultiSubscriberTests.cs index c5320c7..dd91232 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndMultiSubscriberTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndMultiSubscriberTests.cs @@ -70,8 +70,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests writer2, new TestServerCallContext())); - // Give the stream tasks a moment to attach so they are subscribed before Advise. - await Task.Delay(100); + // Wait until both subscribers are registered before issuing the Advise that + // triggers events. Polling ActiveEventSubscriberCount on the live GatewaySession + // is deterministic: we can't race past this point until the production code has + // actually incremented the counter. + await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout); MxCommandReply registerReply = await fixture.Service.Invoke( CreateRegisterRequest(sessionId), @@ -162,8 +165,8 @@ public sealed class GatewayEndToEndMultiSubscriberTests writer2, new TestServerCallContext())); - // Give both streams a moment to attach. - await Task.Delay(100); + // Wait until both subscribers are registered before issuing Advise. + await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout); // Wire up the session: Register + AddItem + Advise. MxCommandReply registerReply = await fixture.Service.Invoke( @@ -190,6 +193,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests await sub1Cts.CancelAsync(); try { + // Awaiting stream1Task here is load-bearing: it ensures EventStreamService's + // finally block runs subscriber.Dispose() → sub1 is fully unregistered before + // AllowNextEvent() below fans the 2nd event. Reordering these two lines would + // let the 2nd event reach sub1 before it is removed, breaking the assertion + // that sub1 received exactly one event. await stream1Task.WaitAsync(TestTimeout); } catch (OperationCanceledException) @@ -238,6 +246,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests { const int cap = 2; GatedEventFakeWorkerProcessLauncher launcher = new(); + + // AllowMultipleEventSubscribers=true is required: the cap rejection path + // (EventSubscriberLimitReached) is only reachable when multi-subscriber mode is on. + // Without it the service hits EventSubscriberAlreadyActive instead — also + // ResourceExhausted, but for the wrong reason — masking a misconfiguration. await using MultiSubscriberGatewayServiceFixture fixture = new(launcher, maxEventSubscribersPerSession: cap); OpenSessionReply openReply = await fixture.Service.OpenSession( @@ -268,8 +281,8 @@ public sealed class GatewayEndToEndMultiSubscriberTests writer2, new TestServerCallContext())); - // Give both streams time to attach. - await Task.Delay(100); + // Wait until both subscribers are registered before emitting or adding a third. + await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout); // Wire up the session so the worker is ready. MxCommandReply registerReply = await fixture.Service.Invoke( @@ -302,6 +315,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests Assert.Equal(StatusCode.ResourceExhausted, capException.StatusCode); + // Confirm this is the cap rejection, not the single-subscriber rejection + // (EventSubscriberAlreadyActive). The production message for + // EventSubscriberLimitReached contains "maximum". + Assert.Contains("maximum", capException.Status.Detail, StringComparison.OrdinalIgnoreCase); + // The first two streams must still be live (not completed). Assert.False(stream1Task.IsCompleted, "Sub1 must still be streaming after sub3 was rejected."); Assert.False(stream2Task.IsCompleted, "Sub2 must still be streaming after sub3 was rejected."); @@ -363,6 +381,25 @@ public sealed class GatewayEndToEndMultiSubscriberTests }, }; + // ---- shared fake worker helpers ---- + + /// + /// Populates with the scripted response for . + /// Shared by both fake worker launchers so the reply logic is not duplicated. + /// + private static void ConfigureCommandReply(MxCommandReply reply, MxCommandKind kind) + { + switch (kind) + { + case MxCommandKind.Register: + reply.Register = new RegisterReply { ServerHandle = ServerHandle }; + break; + case MxCommandKind.AddItem: + reply.AddItem = new AddItemReply { ItemHandle = ItemHandle }; + break; + } + } + // ---- fixture ---- /// @@ -411,6 +448,43 @@ public sealed class GatewayEndToEndMultiSubscriberTests public MxAccessGatewayService Service { get; } + /// + /// Polls for the session + /// identified by until it reaches , + /// bounded by . Fails the test if the count is not reached + /// within the deadline. + /// + /// + /// This is the deterministic gate that replaces Task.Delay before Advise + /// calls: it proves the production code has registered each subscriber before we + /// fan any events. + /// + public async Task WaitForSubscriberCountAsync(string sessionId, int n, TimeSpan timeout) + { + using CancellationTokenSource deadlineCts = new(timeout); + + while (true) + { + if (_registry.TryGet(sessionId, out GatewaySession? session) + && session.ActiveEventSubscriberCount >= n) + { + return; + } + + if (deadlineCts.IsCancellationRequested) + { + int actual = _registry.TryGet(sessionId, out GatewaySession? s) + ? s.ActiveEventSubscriberCount + : -1; + Assert.Fail( + $"Timed out waiting for {n} event subscriber(s) on session {sessionId}. " + + $"Actual count after {timeout.TotalSeconds:0.#}s: {actual}."); + } + + await Task.Delay(millisecondsDelay: 5, deadlineCts.Token).ConfigureAwait(false); + } + } + public async ValueTask DisposeAsync() { foreach (GatewaySession session in _registry.Snapshot()) @@ -505,7 +579,7 @@ public sealed class GatewayEndToEndMultiSubscriberTests await harness.ReplyToCommandAsync( envelope, - configureReply: reply => ConfigureReply(reply, command.Kind), + configureReply: reply => ConfigureCommandReply(reply, command.Kind), cancellationToken: cancellationToken).ConfigureAwait(false); // After Advise emit all events immediately. @@ -533,19 +607,6 @@ public sealed class GatewayEndToEndMultiSubscriberTests } } } - - private static void ConfigureReply(MxCommandReply reply, MxCommandKind kind) - { - switch (kind) - { - case MxCommandKind.Register: - reply.Register = new RegisterReply { ServerHandle = ServerHandle }; - break; - case MxCommandKind.AddItem: - reply.AddItem = new AddItemReply { ItemHandle = ItemHandle }; - break; - } - } } /// @@ -668,7 +729,7 @@ public sealed class GatewayEndToEndMultiSubscriberTests MxCommand command = envelope.WorkerCommand.Command; await harness.ReplyToCommandAsync( envelope, - configureReply: reply => ConfigureReply(reply, command.Kind), + configureReply: reply => ConfigureCommandReply(reply, command.Kind), cancellationToken: cancellationToken).ConfigureAwait(false); if (command.Kind == MxCommandKind.Advise) @@ -678,43 +739,5 @@ public sealed class GatewayEndToEndMultiSubscriberTests } } } - - private static void ConfigureReply(MxCommandReply reply, MxCommandKind kind) - { - switch (kind) - { - case MxCommandKind.Register: - reply.Register = new RegisterReply { ServerHandle = ServerHandle }; - break; - case MxCommandKind.AddItem: - reply.AddItem = new AddItemReply { ItemHandle = ItemHandle }; - break; - } - } - } - - private sealed class FakeWorkerProcess(int processId) : IWorkerProcess - { - private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously); - - public int Id { get; } = processId; - public bool HasExited { get; private set; } - public int? ExitCode { get; private set; } - - public ValueTask WaitForExitAsync(CancellationToken cancellationToken) => - new(_exited.Task.WaitAsync(cancellationToken)); - - public void Kill(bool entireProcessTree) => MarkExited(-1); - - public void Dispose() - { - } - - public void MarkExited(int exitCode) - { - HasExited = true; - ExitCode = exitCode; - _exited.TrySetResult(); - } } } diff --git a/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/FakeWorkerProcess.cs b/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/FakeWorkerProcess.cs new file mode 100644 index 0000000..f65897f --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/FakeWorkerProcess.cs @@ -0,0 +1,47 @@ +using ZB.MOM.WW.MxGateway.Server.Workers; + +namespace ZB.MOM.WW.MxGateway.Tests.TestSupport; + +/// +/// Lightweight in-process stand-in for used by fake worker +/// launchers in end-to-end tests. Call from the fake worker +/// body once the shutdown-ack handshake is complete so that callers awaiting +/// observe real exit rather than a timeout. +/// +public sealed class FakeWorkerProcess(int processId) : IWorkerProcess +{ + private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously); + + /// Gets the process identifier. + public int Id { get; } = processId; + + /// Gets a value indicating whether the process has exited. + public bool HasExited { get; private set; } + + /// Gets the exit code of the process, or if it has not exited. + public int? ExitCode { get; private set; } + + /// + public ValueTask WaitForExitAsync(CancellationToken cancellationToken) => + new(_exited.Task.WaitAsync(cancellationToken)); + + /// + public void Kill(bool entireProcessTree) => MarkExited(-1); + + /// + public void Dispose() + { + } + + /// + /// Marks the process as exited with the specified exit code and unblocks + /// any callers of . + /// + /// The process exit code. + public void MarkExited(int exitCode) + { + HasExited = true; + ExitCode = exitCode; + _exited.TrySetResult(); + } +} diff --git a/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs b/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs index f537bcd..06539b7 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs @@ -70,16 +70,23 @@ public sealed class RecordingServerStreamWriter : IServerStreamWriter /// /// Waits until at least messages have been written, then returns - /// the current snapshot. The wait is bounded by ; if fewer than - /// messages arrive within the timeout the call throws - /// (surfaced as - /// from ). + /// the current snapshot. The wait is bounded by a single deadline of now + timeout; + /// intermediate wakeups (when a message arrives but the count is not yet met) consume from + /// that same deadline so the total elapsed time never exceeds . + /// If fewer than messages arrive before the deadline the call + /// throws . /// /// Minimum number of messages to wait for. - /// Maximum time to wait. + /// Maximum total time to wait, measured from the moment of the call. /// A snapshot of all messages received so far (at least ). public async Task> WaitForMessageCountAsync(int count, TimeSpan timeout) { + // Capture a single deadline so every iteration of the loop below draws from the + // same budget — using WaitAsync(timeout) per iteration would reset the clock on + // each intermediate wakeup, effectively giving N×timeout total budget. + using CancellationTokenSource deadlineCts = new(timeout); + CancellationToken deadlineToken = deadlineCts.Token; + TaskCompletionSource>? tcs = null; lock (_syncRoot) @@ -93,17 +100,17 @@ public sealed class RecordingServerStreamWriter : IServerStreamWriter _countWaiters.Add(tcs); } - // Poll: re-check each time any message arrives. The TCS is satisfied on EVERY write, + // Re-check each time any message arrives. The TCS is satisfied on every write, // but the caller may need more messages, so we loop until the count is met. while (true) { - IReadOnlyList snapshot = await tcs.Task.WaitAsync(timeout).ConfigureAwait(false); + IReadOnlyList snapshot = await tcs.Task.WaitAsync(deadlineToken).ConfigureAwait(false); if (snapshot.Count >= count) { return snapshot; } - // Not enough yet — register a new waiter and keep waiting. + // Not enough yet — register a new waiter and keep waiting against the same deadline. lock (_syncRoot) { if (_messages.Count >= count)