diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs
index d4da987..39ddae7 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs
@@ -533,63 +533,4 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
}
}
- private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
- {
- private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
-
- ///
- /// Gets the process identifier.
- ///
- public int Id { get; } = processId;
-
- ///
- /// Gets a value indicating whether the process has exited.
- ///
- public bool HasExited { get; private set; }
-
- ///
- /// Gets the exit code of the process.
- ///
- public int? ExitCode { get; private set; }
-
- ///
- /// Waits for the process to exit asynchronously. Completes only when
- /// or has been called, so callers that observe completion can
- /// trust that exit actually happened (e.g., via the worker shutdown-ack path).
- ///
- /// Cancellation token.
- /// A task that completes when the process has actually exited.
- public ValueTask WaitForExitAsync(CancellationToken cancellationToken)
- {
- return new ValueTask(_exited.Task.WaitAsync(cancellationToken));
- }
-
- ///
- /// Terminates the process.
- ///
- /// Whether to kill the entire process tree.
- public void Kill(bool entireProcessTree)
- {
- MarkExited(-1);
- }
-
- ///
- /// Releases resources used by this process.
- ///
- public void Dispose()
- {
- }
-
- ///
- /// Marks the process as exited with the specified exit code.
- ///
- /// The process exit code.
- public void MarkExited(int exitCode)
- {
- HasExited = true;
- ExitCode = exitCode;
- _exited.TrySetResult();
- }
- }
-
}
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndMultiSubscriberTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndMultiSubscriberTests.cs
index c5320c7..dd91232 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndMultiSubscriberTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndMultiSubscriberTests.cs
@@ -70,8 +70,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests
writer2,
new TestServerCallContext()));
- // Give the stream tasks a moment to attach so they are subscribed before Advise.
- await Task.Delay(100);
+ // Wait until both subscribers are registered before issuing the Advise that
+ // triggers events. Polling ActiveEventSubscriberCount on the live GatewaySession
+ // is deterministic: we can't race past this point until the production code has
+ // actually incremented the counter.
+ await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout);
MxCommandReply registerReply = await fixture.Service.Invoke(
CreateRegisterRequest(sessionId),
@@ -162,8 +165,8 @@ public sealed class GatewayEndToEndMultiSubscriberTests
writer2,
new TestServerCallContext()));
- // Give both streams a moment to attach.
- await Task.Delay(100);
+ // Wait until both subscribers are registered before issuing Advise.
+ await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout);
// Wire up the session: Register + AddItem + Advise.
MxCommandReply registerReply = await fixture.Service.Invoke(
@@ -190,6 +193,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests
await sub1Cts.CancelAsync();
try
{
+ // Awaiting stream1Task here is load-bearing: it ensures EventStreamService's
+ // finally block runs subscriber.Dispose() → sub1 is fully unregistered before
+ // AllowNextEvent() below fans the 2nd event. Reordering these two lines would
+ // let the 2nd event reach sub1 before it is removed, breaking the assertion
+ // that sub1 received exactly one event.
await stream1Task.WaitAsync(TestTimeout);
}
catch (OperationCanceledException)
@@ -238,6 +246,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests
{
const int cap = 2;
GatedEventFakeWorkerProcessLauncher launcher = new();
+
+ // AllowMultipleEventSubscribers=true is required: the cap rejection path
+ // (EventSubscriberLimitReached) is only reachable when multi-subscriber mode is on.
+ // Without it the service hits EventSubscriberAlreadyActive instead — also
+ // ResourceExhausted, but for the wrong reason — masking a misconfiguration.
await using MultiSubscriberGatewayServiceFixture fixture = new(launcher, maxEventSubscribersPerSession: cap);
OpenSessionReply openReply = await fixture.Service.OpenSession(
@@ -268,8 +281,8 @@ public sealed class GatewayEndToEndMultiSubscriberTests
writer2,
new TestServerCallContext()));
- // Give both streams time to attach.
- await Task.Delay(100);
+ // Wait until both subscribers are registered before emitting or adding a third.
+ await fixture.WaitForSubscriberCountAsync(sessionId, n: 2, TestTimeout);
// Wire up the session so the worker is ready.
MxCommandReply registerReply = await fixture.Service.Invoke(
@@ -302,6 +315,11 @@ public sealed class GatewayEndToEndMultiSubscriberTests
Assert.Equal(StatusCode.ResourceExhausted, capException.StatusCode);
+ // Confirm this is the cap rejection, not the single-subscriber rejection
+ // (EventSubscriberAlreadyActive). The production message for
+ // EventSubscriberLimitReached contains "maximum".
+ Assert.Contains("maximum", capException.Status.Detail, StringComparison.OrdinalIgnoreCase);
+
// The first two streams must still be live (not completed).
Assert.False(stream1Task.IsCompleted, "Sub1 must still be streaming after sub3 was rejected.");
Assert.False(stream2Task.IsCompleted, "Sub2 must still be streaming after sub3 was rejected.");
@@ -363,6 +381,25 @@ public sealed class GatewayEndToEndMultiSubscriberTests
},
};
+ // ---- shared fake worker helpers ----
+
+ ///
+ /// Populates with the scripted response for .
+ /// Shared by both fake worker launchers so the reply logic is not duplicated.
+ ///
+ private static void ConfigureCommandReply(MxCommandReply reply, MxCommandKind kind)
+ {
+ switch (kind)
+ {
+ case MxCommandKind.Register:
+ reply.Register = new RegisterReply { ServerHandle = ServerHandle };
+ break;
+ case MxCommandKind.AddItem:
+ reply.AddItem = new AddItemReply { ItemHandle = ItemHandle };
+ break;
+ }
+ }
+
// ---- fixture ----
///
@@ -411,6 +448,43 @@ public sealed class GatewayEndToEndMultiSubscriberTests
public MxAccessGatewayService Service { get; }
+ ///
+ /// Polls for the session
+ /// identified by until it reaches ,
+ /// bounded by . Fails the test if the count is not reached
+ /// within the deadline.
+ ///
+ ///
+ /// This is the deterministic gate that replaces Task.Delay before Advise
+ /// calls: it proves the production code has registered each subscriber before we
+ /// fan any events.
+ ///
+ public async Task WaitForSubscriberCountAsync(string sessionId, int n, TimeSpan timeout)
+ {
+ using CancellationTokenSource deadlineCts = new(timeout);
+
+ while (true)
+ {
+ if (_registry.TryGet(sessionId, out GatewaySession? session)
+ && session.ActiveEventSubscriberCount >= n)
+ {
+ return;
+ }
+
+ if (deadlineCts.IsCancellationRequested)
+ {
+ int actual = _registry.TryGet(sessionId, out GatewaySession? s)
+ ? s.ActiveEventSubscriberCount
+ : -1;
+ Assert.Fail(
+ $"Timed out waiting for {n} event subscriber(s) on session {sessionId}. "
+ + $"Actual count after {timeout.TotalSeconds:0.#}s: {actual}.");
+ }
+
+ await Task.Delay(millisecondsDelay: 5, deadlineCts.Token).ConfigureAwait(false);
+ }
+ }
+
public async ValueTask DisposeAsync()
{
foreach (GatewaySession session in _registry.Snapshot())
@@ -505,7 +579,7 @@ public sealed class GatewayEndToEndMultiSubscriberTests
await harness.ReplyToCommandAsync(
envelope,
- configureReply: reply => ConfigureReply(reply, command.Kind),
+ configureReply: reply => ConfigureCommandReply(reply, command.Kind),
cancellationToken: cancellationToken).ConfigureAwait(false);
// After Advise emit all events immediately.
@@ -533,19 +607,6 @@ public sealed class GatewayEndToEndMultiSubscriberTests
}
}
}
-
- private static void ConfigureReply(MxCommandReply reply, MxCommandKind kind)
- {
- switch (kind)
- {
- case MxCommandKind.Register:
- reply.Register = new RegisterReply { ServerHandle = ServerHandle };
- break;
- case MxCommandKind.AddItem:
- reply.AddItem = new AddItemReply { ItemHandle = ItemHandle };
- break;
- }
- }
}
///
@@ -668,7 +729,7 @@ public sealed class GatewayEndToEndMultiSubscriberTests
MxCommand command = envelope.WorkerCommand.Command;
await harness.ReplyToCommandAsync(
envelope,
- configureReply: reply => ConfigureReply(reply, command.Kind),
+ configureReply: reply => ConfigureCommandReply(reply, command.Kind),
cancellationToken: cancellationToken).ConfigureAwait(false);
if (command.Kind == MxCommandKind.Advise)
@@ -678,43 +739,5 @@ public sealed class GatewayEndToEndMultiSubscriberTests
}
}
}
-
- private static void ConfigureReply(MxCommandReply reply, MxCommandKind kind)
- {
- switch (kind)
- {
- case MxCommandKind.Register:
- reply.Register = new RegisterReply { ServerHandle = ServerHandle };
- break;
- case MxCommandKind.AddItem:
- reply.AddItem = new AddItemReply { ItemHandle = ItemHandle };
- break;
- }
- }
- }
-
- private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
- {
- private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
-
- public int Id { get; } = processId;
- public bool HasExited { get; private set; }
- public int? ExitCode { get; private set; }
-
- public ValueTask WaitForExitAsync(CancellationToken cancellationToken) =>
- new(_exited.Task.WaitAsync(cancellationToken));
-
- public void Kill(bool entireProcessTree) => MarkExited(-1);
-
- public void Dispose()
- {
- }
-
- public void MarkExited(int exitCode)
- {
- HasExited = true;
- ExitCode = exitCode;
- _exited.TrySetResult();
- }
}
}
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/FakeWorkerProcess.cs b/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/FakeWorkerProcess.cs
new file mode 100644
index 0000000..f65897f
--- /dev/null
+++ b/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/FakeWorkerProcess.cs
@@ -0,0 +1,47 @@
+using ZB.MOM.WW.MxGateway.Server.Workers;
+
+namespace ZB.MOM.WW.MxGateway.Tests.TestSupport;
+
+///
+/// Lightweight in-process stand-in for used by fake worker
+/// launchers in end-to-end tests. Call from the fake worker
+/// body once the shutdown-ack handshake is complete so that callers awaiting
+/// observe real exit rather than a timeout.
+///
+public sealed class FakeWorkerProcess(int processId) : IWorkerProcess
+{
+ private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ /// Gets the process identifier.
+ public int Id { get; } = processId;
+
+ /// Gets a value indicating whether the process has exited.
+ public bool HasExited { get; private set; }
+
+ /// Gets the exit code of the process, or if it has not exited.
+ public int? ExitCode { get; private set; }
+
+ ///
+ public ValueTask WaitForExitAsync(CancellationToken cancellationToken) =>
+ new(_exited.Task.WaitAsync(cancellationToken));
+
+ ///
+ public void Kill(bool entireProcessTree) => MarkExited(-1);
+
+ ///
+ public void Dispose()
+ {
+ }
+
+ ///
+ /// Marks the process as exited with the specified exit code and unblocks
+ /// any callers of .
+ ///
+ /// The process exit code.
+ public void MarkExited(int exitCode)
+ {
+ HasExited = true;
+ ExitCode = exitCode;
+ _exited.TrySetResult();
+ }
+}
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs b/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs
index f537bcd..06539b7 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs
@@ -70,16 +70,23 @@ public sealed class RecordingServerStreamWriter : IServerStreamWriter
///
/// Waits until at least messages have been written, then returns
- /// the current snapshot. The wait is bounded by ; if fewer than
- /// messages arrive within the timeout the call throws
- /// (surfaced as
- /// from ).
+ /// the current snapshot. The wait is bounded by a single deadline of now + timeout;
+ /// intermediate wakeups (when a message arrives but the count is not yet met) consume from
+ /// that same deadline so the total elapsed time never exceeds .
+ /// If fewer than messages arrive before the deadline the call
+ /// throws .
///
/// Minimum number of messages to wait for.
- /// Maximum time to wait.
+ /// Maximum total time to wait, measured from the moment of the call.
/// A snapshot of all messages received so far (at least ).
public async Task> WaitForMessageCountAsync(int count, TimeSpan timeout)
{
+ // Capture a single deadline so every iteration of the loop below draws from the
+ // same budget — using WaitAsync(timeout) per iteration would reset the clock on
+ // each intermediate wakeup, effectively giving N×timeout total budget.
+ using CancellationTokenSource deadlineCts = new(timeout);
+ CancellationToken deadlineToken = deadlineCts.Token;
+
TaskCompletionSource>? tcs = null;
lock (_syncRoot)
@@ -93,17 +100,17 @@ public sealed class RecordingServerStreamWriter : IServerStreamWriter
_countWaiters.Add(tcs);
}
- // Poll: re-check each time any message arrives. The TCS is satisfied on EVERY write,
+ // Re-check each time any message arrives. The TCS is satisfied on every write,
// but the caller may need more messages, so we loop until the count is met.
while (true)
{
- IReadOnlyList snapshot = await tcs.Task.WaitAsync(timeout).ConfigureAwait(false);
+ IReadOnlyList snapshot = await tcs.Task.WaitAsync(deadlineToken).ConfigureAwait(false);
if (snapshot.Count >= count)
{
return snapshot;
}
- // Not enough yet — register a new waiter and keep waiting.
+ // Not enough yet — register a new waiter and keep waiting against the same deadline.
lock (_syncRoot)
{
if (_messages.Count >= count)