diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndMultiSubscriberTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndMultiSubscriberTests.cs new file mode 100644 index 0000000..c5320c7 --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndMultiSubscriberTests.cs @@ -0,0 +1,720 @@ +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using ZB.MOM.WW.MxGateway.Contracts; +using ZB.MOM.WW.MxGateway.Contracts.Proto; +using ZB.MOM.WW.MxGateway.Server.Configuration; +using ZB.MOM.WW.MxGateway.Server.Grpc; +using ZB.MOM.WW.MxGateway.Server.Metrics; +using ZB.MOM.WW.MxGateway.Server.Security.Authentication; +using ZB.MOM.WW.MxGateway.Server.Security.Authorization; +using ZB.MOM.WW.MxGateway.Server.Sessions; +using ZB.MOM.WW.MxGateway.Server.Workers; +using ZB.MOM.WW.MxGateway.Tests.Gateway.Workers.Fakes; +using ZB.MOM.WW.MxGateway.Tests.TestSupport; + +namespace ZB.MOM.WW.MxGateway.Tests.Gateway; + +/// +/// End-to-end multi-subscriber fan-out tests through the real gRPC StreamEvents path via +/// the fake worker harness. Covers fan-out to two concurrent subscribers, independent +/// cancellation isolation, and the per-session subscriber cap. +/// +public sealed class GatewayEndToEndMultiSubscriberTests +{ + private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(10); + private const int ServerHandle = 3001; + private const int ItemHandle = 4002; + private const int EventCount = 3; + + /// + /// Two concurrent StreamEvents RPCs on one session both receive every worker event + /// the fake worker emits, in order. + /// + [Fact] + public async Task StreamEvents_TwoSubscribers_BothReceiveAllEvents() + { + MultiEventFakeWorkerProcessLauncher launcher = new(EventCount); + await using MultiSubscriberGatewayServiceFixture fixture = new(launcher); + + OpenSessionReply openReply = await fixture.Service.OpenSession( + new OpenSessionRequest + { + ClientSessionName = "multi-sub-fanout", + ClientCorrelationId = "open-fanout", + CommandTimeout = Duration.FromTimeSpan(TestTimeout), + }, + new TestServerCallContext()); + + Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); + string sessionId = openReply.SessionId; + + // Attach both event streams before triggering the Advise that causes events. + // This guarantees both subscribers are registered with the distributor before + // the worker emits anything. + RecordingServerStreamWriter writer1 = new(); + RecordingServerStreamWriter writer2 = new(); + + // The streams block internally until the session starts emitting; start them on the + // thread pool so the test can proceed to trigger the Advise. + Task stream1Task = Task.Run(async () => + await fixture.Service.StreamEvents( + new StreamEventsRequest { SessionId = sessionId }, + writer1, + new TestServerCallContext())); + + Task stream2Task = Task.Run(async () => + await fixture.Service.StreamEvents( + new StreamEventsRequest { SessionId = sessionId }, + writer2, + new TestServerCallContext())); + + // Give the stream tasks a moment to attach so they are subscribed before Advise. + await Task.Delay(100); + + MxCommandReply registerReply = await fixture.Service.Invoke( + CreateRegisterRequest(sessionId), + new TestServerCallContext()); + Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); + + MxCommandReply addItemReply = await fixture.Service.Invoke( + CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle), + new TestServerCallContext()); + Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); + + MxCommandReply adviseReply = await fixture.Service.Invoke( + CreateAdviseRequest(sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle), + new TestServerCallContext()); + Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); + + // Both writers must receive all events. + IReadOnlyList events1 = await writer1.WaitForMessageCountAsync(EventCount, TestTimeout); + IReadOnlyList events2 = await writer2.WaitForMessageCountAsync(EventCount, TestTimeout); + + // Close the session, which completes both stream tasks. + await fixture.Service.CloseSession( + new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = "close-fanout" }, + new TestServerCallContext()); + + await stream1Task.WaitAsync(TestTimeout); + await stream2Task.WaitAsync(TestTimeout); + await launcher.WorkerTask.WaitAsync(TestTimeout); + + // Both subscribers must have received all events. + Assert.Equal(EventCount, events1.Count); + Assert.Equal(EventCount, events2.Count); + + // Events must arrive in the same order on both subscribers. + for (int i = 0; i < EventCount; i++) + { + Assert.Equal(MxEventFamily.OnDataChange, events1[i].Family); + Assert.Equal(MxEventFamily.OnDataChange, events2[i].Family); + // Sequence numbers must match between the two subscribers (same fan-out order). + Assert.Equal(events1[i].WorkerSequence, events2[i].WorkerSequence); + // Sequences must be strictly ascending across events. + if (i > 0) + { + Assert.True(events1[i].WorkerSequence > events1[i - 1].WorkerSequence); + } + + Assert.Equal($"scripted-value-{i + 1}", events1[i].Value.StringValue); + Assert.Equal($"scripted-value-{i + 1}", events2[i].Value.StringValue); + } + } + + /// + /// When one of two subscribers cancels its stream, the other subscriber continues + /// to receive subsequent events and the session remains usable. + /// + [Fact] + public async Task StreamEvents_OneSubscriberCancels_OtherContinuesReceivingEvents() + { + GatedEventFakeWorkerProcessLauncher launcher = new(); + await using MultiSubscriberGatewayServiceFixture fixture = new(launcher); + + OpenSessionReply openReply = await fixture.Service.OpenSession( + new OpenSessionRequest + { + ClientSessionName = "multi-sub-cancel", + ClientCorrelationId = "open-cancel", + CommandTimeout = Duration.FromTimeSpan(TestTimeout), + }, + new TestServerCallContext()); + + Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); + string sessionId = openReply.SessionId; + + using CancellationTokenSource sub1Cts = new(); + RecordingServerStreamWriter writer1 = new(); + RecordingServerStreamWriter writer2 = new(); + + // Sub1 uses a CancellationToken we can cancel independently. + Task stream1Task = Task.Run(async () => + await fixture.Service.StreamEvents( + new StreamEventsRequest { SessionId = sessionId }, + writer1, + new TestServerCallContext(cancellationToken: sub1Cts.Token))); + + Task stream2Task = Task.Run(async () => + await fixture.Service.StreamEvents( + new StreamEventsRequest { SessionId = sessionId }, + writer2, + new TestServerCallContext())); + + // Give both streams a moment to attach. + await Task.Delay(100); + + // Wire up the session: Register + AddItem + Advise. + MxCommandReply registerReply = await fixture.Service.Invoke( + CreateRegisterRequest(sessionId), + new TestServerCallContext()); + Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); + + MxCommandReply addItemReply = await fixture.Service.Invoke( + CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle), + new TestServerCallContext()); + Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); + + MxCommandReply adviseReply = await fixture.Service.Invoke( + CreateAdviseRequest(sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle), + new TestServerCallContext()); + Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); + + // Emit first event; both subscribers should see it. + launcher.AllowNextEvent(); + await writer1.WaitForFirstMessageAsync(TestTimeout); + await writer2.WaitForFirstMessageAsync(TestTimeout); + + // Cancel sub1 and wait for it to finish. + await sub1Cts.CancelAsync(); + try + { + await stream1Task.WaitAsync(TestTimeout); + } + catch (OperationCanceledException) + { + // Expected: the iterator surfaces the cancellation. + } + catch (RpcException rpc) when (rpc.StatusCode == StatusCode.Cancelled) + { + // Also acceptable depending on gRPC exception wrapping. + } + + // Emit a second event — only sub2 should see it. + launcher.AllowNextEvent(); + await writer2.WaitForMessageCountAsync(2, TestTimeout); + + // Sub1 must not have received the second event. + Assert.Single(writer1.Messages); + Assert.Equal(2, writer2.Messages.Count); + Assert.Equal(MxEventFamily.OnDataChange, writer2.Messages[1].Family); + + // Tear down: signal the worker to stop emitting, then close the session. + launcher.StopEmitting(); + + await fixture.Service.CloseSession( + new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = "close-cancel" }, + new TestServerCallContext()); + + try + { + await stream2Task.WaitAsync(TestTimeout); + } + catch (OperationCanceledException) + { + } + + await launcher.WorkerTask.WaitAsync(TestTimeout); + } + + /// + /// With MaxEventSubscribersPerSession=2 a third concurrent StreamEvents call is + /// rejected with gRPC status while the first + /// two subscribers keep streaming. + /// + [Fact] + public async Task StreamEvents_ThirdSubscriberWhenCapIsTwo_ReceivesResourceExhausted() + { + const int cap = 2; + GatedEventFakeWorkerProcessLauncher launcher = new(); + await using MultiSubscriberGatewayServiceFixture fixture = new(launcher, maxEventSubscribersPerSession: cap); + + OpenSessionReply openReply = await fixture.Service.OpenSession( + new OpenSessionRequest + { + ClientSessionName = "multi-sub-cap", + ClientCorrelationId = "open-cap", + CommandTimeout = Duration.FromTimeSpan(TestTimeout), + }, + new TestServerCallContext()); + + Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); + string sessionId = openReply.SessionId; + + RecordingServerStreamWriter writer1 = new(); + RecordingServerStreamWriter writer2 = new(); + + // Attach both streams first (before any events are emitted). + Task stream1Task = Task.Run(async () => + await fixture.Service.StreamEvents( + new StreamEventsRequest { SessionId = sessionId }, + writer1, + new TestServerCallContext())); + + Task stream2Task = Task.Run(async () => + await fixture.Service.StreamEvents( + new StreamEventsRequest { SessionId = sessionId }, + writer2, + new TestServerCallContext())); + + // Give both streams time to attach. + await Task.Delay(100); + + // Wire up the session so the worker is ready. + MxCommandReply registerReply = await fixture.Service.Invoke( + CreateRegisterRequest(sessionId), + new TestServerCallContext()); + Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); + + MxCommandReply addItemReply = await fixture.Service.Invoke( + CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle), + new TestServerCallContext()); + Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); + + MxCommandReply adviseReply = await fixture.Service.Invoke( + CreateAdviseRequest(sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle), + new TestServerCallContext()); + Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); + + // Emit one event and confirm both attached streams see it. + launcher.AllowNextEvent(); + await writer1.WaitForFirstMessageAsync(TestTimeout); + await writer2.WaitForFirstMessageAsync(TestTimeout); + + // A third subscriber must be rejected with ResourceExhausted. + RecordingServerStreamWriter writer3 = new(); + RpcException capException = await Assert.ThrowsAsync(async () => + await fixture.Service.StreamEvents( + new StreamEventsRequest { SessionId = sessionId }, + writer3, + new TestServerCallContext())); + + Assert.Equal(StatusCode.ResourceExhausted, capException.StatusCode); + + // The first two streams must still be live (not completed). + Assert.False(stream1Task.IsCompleted, "Sub1 must still be streaming after sub3 was rejected."); + Assert.False(stream2Task.IsCompleted, "Sub2 must still be streaming after sub3 was rejected."); + + // Tear down. + launcher.StopEmitting(); + + await fixture.Service.CloseSession( + new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = "close-cap" }, + new TestServerCallContext()); + + await stream1Task.WaitAsync(TestTimeout); + await stream2Task.WaitAsync(TestTimeout); + await launcher.WorkerTask.WaitAsync(TestTimeout); + } + + // ---- helpers ---- + + private static MxCommandRequest CreateRegisterRequest(string sessionId) => + new() + { + SessionId = sessionId, + ClientCorrelationId = "register-ms", + Command = new MxCommand + { + Kind = MxCommandKind.Register, + Register = new RegisterCommand { ClientName = "multi-sub-e2e-client" }, + }, + }; + + private static MxCommandRequest CreateAddItemRequest(string sessionId, int serverHandle) => + new() + { + SessionId = sessionId, + ClientCorrelationId = "add-item-ms", + Command = new MxCommand + { + Kind = MxCommandKind.AddItem, + AddItem = new AddItemCommand + { + ServerHandle = serverHandle, + ItemDefinition = "Galaxy.Tag.Value", + }, + }, + }; + + private static MxCommandRequest CreateAdviseRequest( + string sessionId, + int serverHandle, + int itemHandle) => + new() + { + SessionId = sessionId, + ClientCorrelationId = "advise-ms", + Command = new MxCommand + { + Kind = MxCommandKind.Advise, + Advise = new AdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle }, + }, + }; + + // ---- fixture ---- + + /// + /// Gateway service fixture with AllowMultipleEventSubscribers=true and a + /// configurable per-session subscriber cap. + /// + private sealed class MultiSubscriberGatewayServiceFixture : IAsyncDisposable + { + private readonly GatewayMetrics _metrics = new(); + private readonly SessionRegistry _registry = new(); + + public MultiSubscriberGatewayServiceFixture( + IWorkerProcessLauncher launcher, + int maxEventSubscribersPerSession = 8) + { + IOptions options = Options.Create(CreateOptions(maxEventSubscribersPerSession)); + SessionWorkerClientFactory workerClientFactory = new( + launcher, + options, + _metrics, + NullLoggerFactory.Instance); + SessionManager sessionManager = new( + _registry, + workerClientFactory, + options, + _metrics, + logger: NullLogger.Instance, + dashboardEventBroadcaster: NullDashboardEventBroadcaster.Instance); + MxAccessGrpcMapper mapper = new(); + EventStreamService eventStreamService = new( + sessionManager, + options, + _metrics); + + Service = new MxAccessGatewayService( + sessionManager, + new GatewayRequestIdentityAccessor(), + new AllowAllConstraintEnforcer(), + new MxAccessGrpcRequestValidator(), + mapper, + eventStreamService, + _metrics, + NullLogger.Instance, + new FakeGatewayAlarmService()); + } + + public MxAccessGatewayService Service { get; } + + public async ValueTask DisposeAsync() + { + foreach (GatewaySession session in _registry.Snapshot()) + { + await session.DisposeAsync(); + } + + _metrics.Dispose(); + } + + private static GatewayOptions CreateOptions(int maxEventSubscribersPerSession) => + new() + { + Worker = new WorkerOptions + { + StartupTimeoutSeconds = 5, + ShutdownTimeoutSeconds = 5, + HeartbeatIntervalSeconds = 30, + HeartbeatGraceSeconds = 30, + MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, + }, + Sessions = new SessionOptions + { + DefaultCommandTimeoutSeconds = 5, + MaxSessions = 4, + AllowMultipleEventSubscribers = true, + MaxEventSubscribersPerSession = maxEventSubscribersPerSession, + }, + Events = new EventOptions + { + QueueCapacity = 32, + }, + }; + } + + // ---- fake worker launchers ---- + + /// + /// Fake worker that emits a fixed number of distinct OnDataChange events after an Advise + /// command, then waits for shutdown. Events carry an indexed string value so the test can + /// verify fan-out order across two subscribers. + /// + private sealed class MultiEventFakeWorkerProcessLauncher(int eventCount) : IWorkerProcessLauncher + { + public const int ProcessId = 7710; + + private readonly FakeWorkerProcess _process = new(ProcessId); + + public Task WorkerTask { get; private set; } = Task.CompletedTask; + + public Task LaunchAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken = default) + { + WorkerTask = RunWorkerAsync(request, cancellationToken); + + return Task.FromResult(new WorkerProcessHandle( + _process, + new WorkerProcessCommandLine("multi-event-fake-worker.exe", []), + DateTimeOffset.UtcNow)); + } + + private async Task RunWorkerAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken) + { + await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync( + request.SessionId, + request.Nonce, + request.PipeName, + request.ProtocolVersion, + cancellationToken: cancellationToken).ConfigureAwait(false); + await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false); + + while (!cancellationToken.IsCancellationRequested) + { + WorkerEnvelope envelope = await harness.ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); + + if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown) + { + await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + _process.MarkExited(0); + return; + } + + if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand) + { + throw new InvalidOperationException($"Unexpected envelope {envelope.BodyCase}."); + } + + MxCommand command = envelope.WorkerCommand.Command; + + await harness.ReplyToCommandAsync( + envelope, + configureReply: reply => ConfigureReply(reply, command.Kind), + cancellationToken: cancellationToken).ConfigureAwait(false); + + // After Advise emit all events immediately. + if (command.Kind == MxCommandKind.Advise) + { + for (int i = 1; i <= eventCount; i++) + { + int index = i; + await harness.EmitEventAsync( + MxEventFamily.OnDataChange, + cancellationToken, + mxEvent => + { + mxEvent.ServerHandle = command.Advise.ServerHandle; + mxEvent.ItemHandle = command.Advise.ItemHandle; + mxEvent.Quality = 192; + mxEvent.Value = new MxValue + { + DataType = MxDataType.String, + StringValue = $"scripted-value-{index}", + }; + mxEvent.OnDataChange = new OnDataChangeEvent(); + }).ConfigureAwait(false); + } + } + } + } + + private static void ConfigureReply(MxCommandReply reply, MxCommandKind kind) + { + switch (kind) + { + case MxCommandKind.Register: + reply.Register = new RegisterReply { ServerHandle = ServerHandle }; + break; + case MxCommandKind.AddItem: + reply.AddItem = new AddItemReply { ItemHandle = ItemHandle }; + break; + } + } + } + + /// + /// Fake worker that emits events one at a time, gated by + /// . The test drives the timing so assertions are + /// deterministic. Call before closing the session so the + /// worker loop exits cleanly and can process the shutdown envelope. + /// + private sealed class GatedEventFakeWorkerProcessLauncher : IWorkerProcessLauncher + { + public const int ProcessId = 7720; + + private readonly FakeWorkerProcess _process = new(ProcessId); + + // Capacity 64 so AllowNextEvent can be called ahead of time without blocking. + private readonly SemaphoreSlim _emitGate = new(0, 64); + private volatile bool _stopEmitting; + + public Task WorkerTask { get; private set; } = Task.CompletedTask; + + /// Releases the gate so the worker emits one event. + public void AllowNextEvent() => _emitGate.Release(); + + /// + /// Signals the worker to stop waiting for the emit gate and process the + /// shutdown envelope. Must be called before CloseSession. + /// + public void StopEmitting() + { + _stopEmitting = true; + _emitGate.Release(); // unblock a pending gate wait if any + } + + public Task LaunchAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken = default) + { + WorkerTask = RunWorkerAsync(request, cancellationToken); + + return Task.FromResult(new WorkerProcessHandle( + _process, + new WorkerProcessCommandLine("gated-event-fake-worker.exe", []), + DateTimeOffset.UtcNow)); + } + + private async Task RunWorkerAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken) + { + await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync( + request.SessionId, + request.Nonce, + request.PipeName, + request.ProtocolVersion, + cancellationToken: cancellationToken).ConfigureAwait(false); + await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false); + + int advisedServerHandle = 0; + int advisedItemHandle = 0; + int emittedCount = 0; + + // Read envelopes one at a time. Between each envelope, if we have a + // subscription and the gate is ready, emit an event before reading the + // next envelope. When _stopEmitting is set, drain the gate and read + // remaining envelopes (including shutdown) without emitting. + while (!cancellationToken.IsCancellationRequested) + { + // While subscribed and not stopped, try to emit gated events using + // a short-timeout peek at the gate — yield to incoming envelopes to + // avoid starving shutdown processing. + while (advisedServerHandle != 0 + && !_stopEmitting + && await _emitGate.WaitAsync(millisecondsTimeout: 0).ConfigureAwait(false)) + { + int index = ++emittedCount; + await harness.EmitEventAsync( + MxEventFamily.OnDataChange, + cancellationToken, + mxEvent => + { + mxEvent.ServerHandle = advisedServerHandle; + mxEvent.ItemHandle = advisedItemHandle; + mxEvent.Quality = 192; + mxEvent.Value = new MxValue + { + DataType = MxDataType.String, + StringValue = $"gated-value-{index}", + }; + mxEvent.OnDataChange = new OnDataChangeEvent(); + }).ConfigureAwait(false); + } + + // Use a short timeout so the emit loop above is re-evaluated + // periodically — but long enough not to spam. + WorkerEnvelope? envelope = null; + try + { + using CancellationTokenSource readCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + readCts.CancelAfter(TimeSpan.FromMilliseconds(50)); + envelope = await harness.ReadGatewayEnvelopeAsync(readCts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) + { + // Timed out waiting for an envelope — loop back to check gate / emit. + continue; + } + + if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown) + { + await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + _process.MarkExited(0); + return; + } + + if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand) + { + throw new InvalidOperationException($"Unexpected envelope {envelope.BodyCase}."); + } + + MxCommand command = envelope.WorkerCommand.Command; + await harness.ReplyToCommandAsync( + envelope, + configureReply: reply => ConfigureReply(reply, command.Kind), + cancellationToken: cancellationToken).ConfigureAwait(false); + + if (command.Kind == MxCommandKind.Advise) + { + advisedServerHandle = command.Advise.ServerHandle; + advisedItemHandle = command.Advise.ItemHandle; + } + } + } + + private static void ConfigureReply(MxCommandReply reply, MxCommandKind kind) + { + switch (kind) + { + case MxCommandKind.Register: + reply.Register = new RegisterReply { ServerHandle = ServerHandle }; + break; + case MxCommandKind.AddItem: + reply.AddItem = new AddItemReply { ItemHandle = ItemHandle }; + break; + } + } + } + + private sealed class FakeWorkerProcess(int processId) : IWorkerProcess + { + private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public int Id { get; } = processId; + public bool HasExited { get; private set; } + public int? ExitCode { get; private set; } + + public ValueTask WaitForExitAsync(CancellationToken cancellationToken) => + new(_exited.Task.WaitAsync(cancellationToken)); + + public void Kill(bool entireProcessTree) => MarkExited(-1); + + public void Dispose() + { + } + + public void MarkExited(int exitCode) + { + HasExited = true; + ExitCode = exitCode; + _exited.TrySetResult(); + } + } +} diff --git a/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs b/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs index d18eeef..f537bcd 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/TestSupport/RecordingServerStreamWriter.cs @@ -4,7 +4,7 @@ namespace ZB.MOM.WW.MxGateway.Tests.TestSupport; /// /// Thread-safe that records every written message -/// and lets a test await the first message with a timeout. +/// and lets a test await the first message or a specific message count with a timeout. /// /// The streamed message type. public sealed class RecordingServerStreamWriter : IServerStreamWriter @@ -12,6 +12,7 @@ public sealed class RecordingServerStreamWriter : IServerStreamWriter private readonly object _syncRoot = new(); private readonly TaskCompletionSource _firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously); private readonly List _messages = []; + private readonly List>> _countWaiters = []; /// Gets the messages written to this stream, in order. public IReadOnlyList Messages @@ -33,12 +34,31 @@ public sealed class RecordingServerStreamWriter : IServerStreamWriter /// A completed task. public Task WriteAsync(T message) { + List>>? satisfied = null; + IReadOnlyList? snapshot = null; + lock (_syncRoot) { _messages.Add(message); + _firstMessage.TrySetResult(message); + + // Check whether any count waiters are now satisfied. + if (_countWaiters.Count > 0) + { + snapshot = _messages.ToArray(); + satisfied = _countWaiters.ToList(); + _countWaiters.Clear(); + } + } + + if (satisfied is not null && snapshot is not null) + { + foreach (TaskCompletionSource> waiter in satisfied) + { + waiter.TrySetResult(snapshot); + } } - _firstMessage.TrySetResult(message); return Task.CompletedTask; } @@ -47,4 +67,53 @@ public sealed class RecordingServerStreamWriter : IServerStreamWriter /// The first message written to this stream. public async Task WaitForFirstMessageAsync(TimeSpan timeout) => await _firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false); + + /// + /// Waits until at least messages have been written, then returns + /// the current snapshot. The wait is bounded by ; if fewer than + /// messages arrive within the timeout the call throws + /// (surfaced as + /// from ). + /// + /// Minimum number of messages to wait for. + /// Maximum time to wait. + /// A snapshot of all messages received so far (at least ). + public async Task> WaitForMessageCountAsync(int count, TimeSpan timeout) + { + TaskCompletionSource>? tcs = null; + + lock (_syncRoot) + { + if (_messages.Count >= count) + { + return _messages.ToArray(); + } + + tcs = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); + _countWaiters.Add(tcs); + } + + // Poll: re-check each time any message arrives. The TCS is satisfied on EVERY write, + // but the caller may need more messages, so we loop until the count is met. + while (true) + { + IReadOnlyList snapshot = await tcs.Task.WaitAsync(timeout).ConfigureAwait(false); + if (snapshot.Count >= count) + { + return snapshot; + } + + // Not enough yet — register a new waiter and keep waiting. + lock (_syncRoot) + { + if (_messages.Count >= count) + { + return _messages.ToArray(); + } + + tcs = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); + _countWaiters.Add(tcs); + } + } + } }