test(gateway): end-to-end multi-subscriber fan-out via fake worker

Adds GatewayEndToEndMultiSubscriberTests covering three scenarios
through the real gRPC StreamEvents path with AllowMultipleEventSubscribers=true:
- Fan-out: two concurrent StreamEvents RPCs both receive every event the fake
  worker emits, in the same order (WorkerSequence matches, values indexed).
- Independent cancellation: cancelling one subscriber's stream leaves the other
  receiving subsequent events; the session stays usable.
- Cap enforcement: with MaxEventSubscribersPerSession=2 a third concurrent
  StreamEvents is rejected with gRPC ResourceExhausted while the first two
  keep streaming.

Extends RecordingServerStreamWriter<T> with WaitForMessageCountAsync to
allow deterministic bounded-timeout awaits for an N-message count without
fixed sleeps.
This commit is contained in:
Joseph Doherty
2026-06-15 16:09:58 -04:00
parent 281e00b300
commit 9dd97a27f1
2 changed files with 791 additions and 2 deletions
@@ -0,0 +1,720 @@
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.MxGateway.Contracts;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Security.Authentication;
using ZB.MOM.WW.MxGateway.Server.Security.Authorization;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.Gateway.Workers.Fakes;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway;
/// <summary>
/// End-to-end multi-subscriber fan-out tests through the real gRPC StreamEvents path via
/// the fake worker harness. Covers fan-out to two concurrent subscribers, independent
/// cancellation isolation, and the per-session subscriber cap.
/// </summary>
public sealed class GatewayEndToEndMultiSubscriberTests
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(10);
private const int ServerHandle = 3001;
private const int ItemHandle = 4002;
private const int EventCount = 3;
/// <summary>
/// Two concurrent StreamEvents RPCs on one session both receive every worker event
/// the fake worker emits, in order.
/// </summary>
[Fact]
public async Task StreamEvents_TwoSubscribers_BothReceiveAllEvents()
{
MultiEventFakeWorkerProcessLauncher launcher = new(EventCount);
await using MultiSubscriberGatewayServiceFixture fixture = new(launcher);
OpenSessionReply openReply = await fixture.Service.OpenSession(
new OpenSessionRequest
{
ClientSessionName = "multi-sub-fanout",
ClientCorrelationId = "open-fanout",
CommandTimeout = Duration.FromTimeSpan(TestTimeout),
},
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
string sessionId = openReply.SessionId;
// Attach both event streams before triggering the Advise that causes events.
// This guarantees both subscribers are registered with the distributor before
// the worker emits anything.
RecordingServerStreamWriter<MxEvent> writer1 = new();
RecordingServerStreamWriter<MxEvent> writer2 = new();
// The streams block internally until the session starts emitting; start them on the
// thread pool so the test can proceed to trigger the Advise.
Task stream1Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer1,
new TestServerCallContext()));
Task stream2Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer2,
new TestServerCallContext()));
// Give the stream tasks a moment to attach so they are subscribed before Advise.
await Task.Delay(100);
MxCommandReply registerReply = await fixture.Service.Invoke(
CreateRegisterRequest(sessionId),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
MxCommandReply addItemReply = await fixture.Service.Invoke(
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
MxCommandReply adviseReply = await fixture.Service.Invoke(
CreateAdviseRequest(sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
// Both writers must receive all events.
IReadOnlyList<MxEvent> events1 = await writer1.WaitForMessageCountAsync(EventCount, TestTimeout);
IReadOnlyList<MxEvent> events2 = await writer2.WaitForMessageCountAsync(EventCount, TestTimeout);
// Close the session, which completes both stream tasks.
await fixture.Service.CloseSession(
new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = "close-fanout" },
new TestServerCallContext());
await stream1Task.WaitAsync(TestTimeout);
await stream2Task.WaitAsync(TestTimeout);
await launcher.WorkerTask.WaitAsync(TestTimeout);
// Both subscribers must have received all events.
Assert.Equal(EventCount, events1.Count);
Assert.Equal(EventCount, events2.Count);
// Events must arrive in the same order on both subscribers.
for (int i = 0; i < EventCount; i++)
{
Assert.Equal(MxEventFamily.OnDataChange, events1[i].Family);
Assert.Equal(MxEventFamily.OnDataChange, events2[i].Family);
// Sequence numbers must match between the two subscribers (same fan-out order).
Assert.Equal(events1[i].WorkerSequence, events2[i].WorkerSequence);
// Sequences must be strictly ascending across events.
if (i > 0)
{
Assert.True(events1[i].WorkerSequence > events1[i - 1].WorkerSequence);
}
Assert.Equal($"scripted-value-{i + 1}", events1[i].Value.StringValue);
Assert.Equal($"scripted-value-{i + 1}", events2[i].Value.StringValue);
}
}
/// <summary>
/// When one of two subscribers cancels its stream, the other subscriber continues
/// to receive subsequent events and the session remains usable.
/// </summary>
[Fact]
public async Task StreamEvents_OneSubscriberCancels_OtherContinuesReceivingEvents()
{
GatedEventFakeWorkerProcessLauncher launcher = new();
await using MultiSubscriberGatewayServiceFixture fixture = new(launcher);
OpenSessionReply openReply = await fixture.Service.OpenSession(
new OpenSessionRequest
{
ClientSessionName = "multi-sub-cancel",
ClientCorrelationId = "open-cancel",
CommandTimeout = Duration.FromTimeSpan(TestTimeout),
},
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
string sessionId = openReply.SessionId;
using CancellationTokenSource sub1Cts = new();
RecordingServerStreamWriter<MxEvent> writer1 = new();
RecordingServerStreamWriter<MxEvent> writer2 = new();
// Sub1 uses a CancellationToken we can cancel independently.
Task stream1Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer1,
new TestServerCallContext(cancellationToken: sub1Cts.Token)));
Task stream2Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer2,
new TestServerCallContext()));
// Give both streams a moment to attach.
await Task.Delay(100);
// Wire up the session: Register + AddItem + Advise.
MxCommandReply registerReply = await fixture.Service.Invoke(
CreateRegisterRequest(sessionId),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
MxCommandReply addItemReply = await fixture.Service.Invoke(
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
MxCommandReply adviseReply = await fixture.Service.Invoke(
CreateAdviseRequest(sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
// Emit first event; both subscribers should see it.
launcher.AllowNextEvent();
await writer1.WaitForFirstMessageAsync(TestTimeout);
await writer2.WaitForFirstMessageAsync(TestTimeout);
// Cancel sub1 and wait for it to finish.
await sub1Cts.CancelAsync();
try
{
await stream1Task.WaitAsync(TestTimeout);
}
catch (OperationCanceledException)
{
// Expected: the iterator surfaces the cancellation.
}
catch (RpcException rpc) when (rpc.StatusCode == StatusCode.Cancelled)
{
// Also acceptable depending on gRPC exception wrapping.
}
// Emit a second event — only sub2 should see it.
launcher.AllowNextEvent();
await writer2.WaitForMessageCountAsync(2, TestTimeout);
// Sub1 must not have received the second event.
Assert.Single(writer1.Messages);
Assert.Equal(2, writer2.Messages.Count);
Assert.Equal(MxEventFamily.OnDataChange, writer2.Messages[1].Family);
// Tear down: signal the worker to stop emitting, then close the session.
launcher.StopEmitting();
await fixture.Service.CloseSession(
new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = "close-cancel" },
new TestServerCallContext());
try
{
await stream2Task.WaitAsync(TestTimeout);
}
catch (OperationCanceledException)
{
}
await launcher.WorkerTask.WaitAsync(TestTimeout);
}
/// <summary>
/// With <c>MaxEventSubscribersPerSession=2</c> a third concurrent StreamEvents call is
/// rejected with gRPC status <see cref="StatusCode.ResourceExhausted"/> while the first
/// two subscribers keep streaming.
/// </summary>
[Fact]
public async Task StreamEvents_ThirdSubscriberWhenCapIsTwo_ReceivesResourceExhausted()
{
const int cap = 2;
GatedEventFakeWorkerProcessLauncher launcher = new();
await using MultiSubscriberGatewayServiceFixture fixture = new(launcher, maxEventSubscribersPerSession: cap);
OpenSessionReply openReply = await fixture.Service.OpenSession(
new OpenSessionRequest
{
ClientSessionName = "multi-sub-cap",
ClientCorrelationId = "open-cap",
CommandTimeout = Duration.FromTimeSpan(TestTimeout),
},
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
string sessionId = openReply.SessionId;
RecordingServerStreamWriter<MxEvent> writer1 = new();
RecordingServerStreamWriter<MxEvent> writer2 = new();
// Attach both streams first (before any events are emitted).
Task stream1Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer1,
new TestServerCallContext()));
Task stream2Task = Task.Run(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer2,
new TestServerCallContext()));
// Give both streams time to attach.
await Task.Delay(100);
// Wire up the session so the worker is ready.
MxCommandReply registerReply = await fixture.Service.Invoke(
CreateRegisterRequest(sessionId),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
MxCommandReply addItemReply = await fixture.Service.Invoke(
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
MxCommandReply adviseReply = await fixture.Service.Invoke(
CreateAdviseRequest(sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle),
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
// Emit one event and confirm both attached streams see it.
launcher.AllowNextEvent();
await writer1.WaitForFirstMessageAsync(TestTimeout);
await writer2.WaitForFirstMessageAsync(TestTimeout);
// A third subscriber must be rejected with ResourceExhausted.
RecordingServerStreamWriter<MxEvent> writer3 = new();
RpcException capException = await Assert.ThrowsAsync<RpcException>(async () =>
await fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
writer3,
new TestServerCallContext()));
Assert.Equal(StatusCode.ResourceExhausted, capException.StatusCode);
// The first two streams must still be live (not completed).
Assert.False(stream1Task.IsCompleted, "Sub1 must still be streaming after sub3 was rejected.");
Assert.False(stream2Task.IsCompleted, "Sub2 must still be streaming after sub3 was rejected.");
// Tear down.
launcher.StopEmitting();
await fixture.Service.CloseSession(
new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = "close-cap" },
new TestServerCallContext());
await stream1Task.WaitAsync(TestTimeout);
await stream2Task.WaitAsync(TestTimeout);
await launcher.WorkerTask.WaitAsync(TestTimeout);
}
// ---- helpers ----
private static MxCommandRequest CreateRegisterRequest(string sessionId) =>
new()
{
SessionId = sessionId,
ClientCorrelationId = "register-ms",
Command = new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = "multi-sub-e2e-client" },
},
};
private static MxCommandRequest CreateAddItemRequest(string sessionId, int serverHandle) =>
new()
{
SessionId = sessionId,
ClientCorrelationId = "add-item-ms",
Command = new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = "Galaxy.Tag.Value",
},
},
};
private static MxCommandRequest CreateAdviseRequest(
string sessionId,
int serverHandle,
int itemHandle) =>
new()
{
SessionId = sessionId,
ClientCorrelationId = "advise-ms",
Command = new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle },
},
};
// ---- fixture ----
/// <summary>
/// Gateway service fixture with <c>AllowMultipleEventSubscribers=true</c> and a
/// configurable per-session subscriber cap.
/// </summary>
private sealed class MultiSubscriberGatewayServiceFixture : IAsyncDisposable
{
private readonly GatewayMetrics _metrics = new();
private readonly SessionRegistry _registry = new();
public MultiSubscriberGatewayServiceFixture(
IWorkerProcessLauncher launcher,
int maxEventSubscribersPerSession = 8)
{
IOptions<GatewayOptions> options = Options.Create(CreateOptions(maxEventSubscribersPerSession));
SessionWorkerClientFactory workerClientFactory = new(
launcher,
options,
_metrics,
NullLoggerFactory.Instance);
SessionManager sessionManager = new(
_registry,
workerClientFactory,
options,
_metrics,
logger: NullLogger<SessionManager>.Instance,
dashboardEventBroadcaster: NullDashboardEventBroadcaster.Instance);
MxAccessGrpcMapper mapper = new();
EventStreamService eventStreamService = new(
sessionManager,
options,
_metrics);
Service = new MxAccessGatewayService(
sessionManager,
new GatewayRequestIdentityAccessor(),
new AllowAllConstraintEnforcer(),
new MxAccessGrpcRequestValidator(),
mapper,
eventStreamService,
_metrics,
NullLogger<MxAccessGatewayService>.Instance,
new FakeGatewayAlarmService());
}
public MxAccessGatewayService Service { get; }
public async ValueTask DisposeAsync()
{
foreach (GatewaySession session in _registry.Snapshot())
{
await session.DisposeAsync();
}
_metrics.Dispose();
}
private static GatewayOptions CreateOptions(int maxEventSubscribersPerSession) =>
new()
{
Worker = new WorkerOptions
{
StartupTimeoutSeconds = 5,
ShutdownTimeoutSeconds = 5,
HeartbeatIntervalSeconds = 30,
HeartbeatGraceSeconds = 30,
MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
},
Sessions = new SessionOptions
{
DefaultCommandTimeoutSeconds = 5,
MaxSessions = 4,
AllowMultipleEventSubscribers = true,
MaxEventSubscribersPerSession = maxEventSubscribersPerSession,
},
Events = new EventOptions
{
QueueCapacity = 32,
},
};
}
// ---- fake worker launchers ----
/// <summary>
/// Fake worker that emits a fixed number of distinct OnDataChange events after an Advise
/// command, then waits for shutdown. Events carry an indexed string value so the test can
/// verify fan-out order across two subscribers.
/// </summary>
private sealed class MultiEventFakeWorkerProcessLauncher(int eventCount) : IWorkerProcessLauncher
{
public const int ProcessId = 7710;
private readonly FakeWorkerProcess _process = new(ProcessId);
public Task WorkerTask { get; private set; } = Task.CompletedTask;
public Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default)
{
WorkerTask = RunWorkerAsync(request, cancellationToken);
return Task.FromResult(new WorkerProcessHandle(
_process,
new WorkerProcessCommandLine("multi-event-fake-worker.exe", []),
DateTimeOffset.UtcNow));
}
private async Task RunWorkerAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync(
request.SessionId,
request.Nonce,
request.PipeName,
request.ProtocolVersion,
cancellationToken: cancellationToken).ConfigureAwait(false);
await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false);
while (!cancellationToken.IsCancellationRequested)
{
WorkerEnvelope envelope = await harness.ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false);
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown)
{
await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
_process.MarkExited(0);
return;
}
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand)
{
throw new InvalidOperationException($"Unexpected envelope {envelope.BodyCase}.");
}
MxCommand command = envelope.WorkerCommand.Command;
await harness.ReplyToCommandAsync(
envelope,
configureReply: reply => ConfigureReply(reply, command.Kind),
cancellationToken: cancellationToken).ConfigureAwait(false);
// After Advise emit all events immediately.
if (command.Kind == MxCommandKind.Advise)
{
for (int i = 1; i <= eventCount; i++)
{
int index = i;
await harness.EmitEventAsync(
MxEventFamily.OnDataChange,
cancellationToken,
mxEvent =>
{
mxEvent.ServerHandle = command.Advise.ServerHandle;
mxEvent.ItemHandle = command.Advise.ItemHandle;
mxEvent.Quality = 192;
mxEvent.Value = new MxValue
{
DataType = MxDataType.String,
StringValue = $"scripted-value-{index}",
};
mxEvent.OnDataChange = new OnDataChangeEvent();
}).ConfigureAwait(false);
}
}
}
}
private static void ConfigureReply(MxCommandReply reply, MxCommandKind kind)
{
switch (kind)
{
case MxCommandKind.Register:
reply.Register = new RegisterReply { ServerHandle = ServerHandle };
break;
case MxCommandKind.AddItem:
reply.AddItem = new AddItemReply { ItemHandle = ItemHandle };
break;
}
}
}
/// <summary>
/// Fake worker that emits events one at a time, gated by
/// <see cref="AllowNextEvent"/>. The test drives the timing so assertions are
/// deterministic. Call <see cref="StopEmitting"/> before closing the session so the
/// worker loop exits cleanly and can process the shutdown envelope.
/// </summary>
private sealed class GatedEventFakeWorkerProcessLauncher : IWorkerProcessLauncher
{
public const int ProcessId = 7720;
private readonly FakeWorkerProcess _process = new(ProcessId);
// Capacity 64 so AllowNextEvent can be called ahead of time without blocking.
private readonly SemaphoreSlim _emitGate = new(0, 64);
private volatile bool _stopEmitting;
public Task WorkerTask { get; private set; } = Task.CompletedTask;
/// <summary>Releases the gate so the worker emits one event.</summary>
public void AllowNextEvent() => _emitGate.Release();
/// <summary>
/// Signals the worker to stop waiting for the emit gate and process the
/// shutdown envelope. Must be called before <c>CloseSession</c>.
/// </summary>
public void StopEmitting()
{
_stopEmitting = true;
_emitGate.Release(); // unblock a pending gate wait if any
}
public Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default)
{
WorkerTask = RunWorkerAsync(request, cancellationToken);
return Task.FromResult(new WorkerProcessHandle(
_process,
new WorkerProcessCommandLine("gated-event-fake-worker.exe", []),
DateTimeOffset.UtcNow));
}
private async Task RunWorkerAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync(
request.SessionId,
request.Nonce,
request.PipeName,
request.ProtocolVersion,
cancellationToken: cancellationToken).ConfigureAwait(false);
await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false);
int advisedServerHandle = 0;
int advisedItemHandle = 0;
int emittedCount = 0;
// Read envelopes one at a time. Between each envelope, if we have a
// subscription and the gate is ready, emit an event before reading the
// next envelope. When _stopEmitting is set, drain the gate and read
// remaining envelopes (including shutdown) without emitting.
while (!cancellationToken.IsCancellationRequested)
{
// While subscribed and not stopped, try to emit gated events using
// a short-timeout peek at the gate — yield to incoming envelopes to
// avoid starving shutdown processing.
while (advisedServerHandle != 0
&& !_stopEmitting
&& await _emitGate.WaitAsync(millisecondsTimeout: 0).ConfigureAwait(false))
{
int index = ++emittedCount;
await harness.EmitEventAsync(
MxEventFamily.OnDataChange,
cancellationToken,
mxEvent =>
{
mxEvent.ServerHandle = advisedServerHandle;
mxEvent.ItemHandle = advisedItemHandle;
mxEvent.Quality = 192;
mxEvent.Value = new MxValue
{
DataType = MxDataType.String,
StringValue = $"gated-value-{index}",
};
mxEvent.OnDataChange = new OnDataChangeEvent();
}).ConfigureAwait(false);
}
// Use a short timeout so the emit loop above is re-evaluated
// periodically — but long enough not to spam.
WorkerEnvelope? envelope = null;
try
{
using CancellationTokenSource readCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
readCts.CancelAfter(TimeSpan.FromMilliseconds(50));
envelope = await harness.ReadGatewayEnvelopeAsync(readCts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
// Timed out waiting for an envelope — loop back to check gate / emit.
continue;
}
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown)
{
await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
_process.MarkExited(0);
return;
}
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand)
{
throw new InvalidOperationException($"Unexpected envelope {envelope.BodyCase}.");
}
MxCommand command = envelope.WorkerCommand.Command;
await harness.ReplyToCommandAsync(
envelope,
configureReply: reply => ConfigureReply(reply, command.Kind),
cancellationToken: cancellationToken).ConfigureAwait(false);
if (command.Kind == MxCommandKind.Advise)
{
advisedServerHandle = command.Advise.ServerHandle;
advisedItemHandle = command.Advise.ItemHandle;
}
}
}
private static void ConfigureReply(MxCommandReply reply, MxCommandKind kind)
{
switch (kind)
{
case MxCommandKind.Register:
reply.Register = new RegisterReply { ServerHandle = ServerHandle };
break;
case MxCommandKind.AddItem:
reply.AddItem = new AddItemReply { ItemHandle = ItemHandle };
break;
}
}
}
private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
public int Id { get; } = processId;
public bool HasExited { get; private set; }
public int? ExitCode { get; private set; }
public ValueTask WaitForExitAsync(CancellationToken cancellationToken) =>
new(_exited.Task.WaitAsync(cancellationToken));
public void Kill(bool entireProcessTree) => MarkExited(-1);
public void Dispose()
{
}
public void MarkExited(int exitCode)
{
HasExited = true;
ExitCode = exitCode;
_exited.TrySetResult();
}
}
}
@@ -4,7 +4,7 @@ namespace ZB.MOM.WW.MxGateway.Tests.TestSupport;
/// <summary>
/// Thread-safe <see cref="IServerStreamWriter{T}"/> that records every written message
/// and lets a test await the first message with a timeout.
/// and lets a test await the first message or a specific message count with a timeout.
/// </summary>
/// <typeparam name="T">The streamed message type.</typeparam>
public sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
@@ -12,6 +12,7 @@ public sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
private readonly object _syncRoot = new();
private readonly TaskCompletionSource<T> _firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly List<T> _messages = [];
private readonly List<TaskCompletionSource<IReadOnlyList<T>>> _countWaiters = [];
/// <summary>Gets the messages written to this stream, in order.</summary>
public IReadOnlyList<T> Messages
@@ -33,12 +34,31 @@ public sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
/// <returns>A completed task.</returns>
public Task WriteAsync(T message)
{
List<TaskCompletionSource<IReadOnlyList<T>>>? satisfied = null;
IReadOnlyList<T>? snapshot = null;
lock (_syncRoot)
{
_messages.Add(message);
_firstMessage.TrySetResult(message);
// Check whether any count waiters are now satisfied.
if (_countWaiters.Count > 0)
{
snapshot = _messages.ToArray();
satisfied = _countWaiters.ToList();
_countWaiters.Clear();
}
}
if (satisfied is not null && snapshot is not null)
{
foreach (TaskCompletionSource<IReadOnlyList<T>> waiter in satisfied)
{
waiter.TrySetResult(snapshot);
}
}
_firstMessage.TrySetResult(message);
return Task.CompletedTask;
}
@@ -47,4 +67,53 @@ public sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
/// <returns>The first message written to this stream.</returns>
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout) =>
await _firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
/// <summary>
/// Waits until at least <paramref name="count"/> messages have been written, then returns
/// the current snapshot. The wait is bounded by <paramref name="timeout"/>; if fewer than
/// <paramref name="count"/> messages arrive within the timeout the call throws
/// <see cref="TimeoutException"/> (surfaced as <see cref="OperationCanceledException"/>
/// from <see cref="Task.WaitAsync(TimeSpan)"/>).
/// </summary>
/// <param name="count">Minimum number of messages to wait for.</param>
/// <param name="timeout">Maximum time to wait.</param>
/// <returns>A snapshot of all messages received so far (at least <paramref name="count"/>).</returns>
public async Task<IReadOnlyList<T>> WaitForMessageCountAsync(int count, TimeSpan timeout)
{
TaskCompletionSource<IReadOnlyList<T>>? tcs = null;
lock (_syncRoot)
{
if (_messages.Count >= count)
{
return _messages.ToArray();
}
tcs = new TaskCompletionSource<IReadOnlyList<T>>(TaskCreationOptions.RunContinuationsAsynchronously);
_countWaiters.Add(tcs);
}
// Poll: re-check each time any message arrives. The TCS is satisfied on EVERY write,
// but the caller may need more messages, so we loop until the count is met.
while (true)
{
IReadOnlyList<T> snapshot = await tcs.Task.WaitAsync(timeout).ConfigureAwait(false);
if (snapshot.Count >= count)
{
return snapshot;
}
// Not enough yet — register a new waiter and keep waiting.
lock (_syncRoot)
{
if (_messages.Count >= count)
{
return _messages.ToArray();
}
tcs = new TaskCompletionSource<IReadOnlyList<T>>(TaskCreationOptions.RunContinuationsAsynchronously);
_countWaiters.Add(tcs);
}
}
}
}