diff --git a/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs b/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs index 6370de6..e53f297 100644 --- a/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs +++ b/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs @@ -128,6 +128,7 @@ public sealed class WorkerClientTests new WorkerClientOptions { EventChannelCapacity = 1, + EventChannelFullModeTimeout = TimeSpan.FromMilliseconds(50), HeartbeatGrace = TimeSpan.FromSeconds(30), HeartbeatCheckInterval = TimeSpan.FromSeconds(30), }); @@ -163,6 +164,7 @@ public sealed class WorkerClientTests new WorkerClientOptions { EventChannelCapacity = 1, + EventChannelFullModeTimeout = TimeSpan.FromMilliseconds(50), HeartbeatGrace = TimeSpan.FromSeconds(30), HeartbeatCheckInterval = TimeSpan.FromSeconds(30), }, @@ -483,10 +485,13 @@ public sealed class WorkerClientTests }); await CompleteHandshakeAsync(client, pipePair); - // Fill the channel plus one to force the overflow path. The gateway - // never opens a StreamEvents consumer so the events stay in the - // bounded channel. - for (ulong sequence = 1; sequence <= 6; sequence++) + // Fill the 4-slot channel and write exactly one more to force the + // overflow path. The gateway never opens a StreamEvents consumer, so + // the events stay buffered. Exactly five events are written: the + // worker client faults while reading the fifth, after which its read + // loop stops — a sixth event would never be drained and its pipe + // write would block forever on a full OS pipe buffer. + for (ulong sequence = 1; sequence <= 5; sequence++) { await pipePair.WorkerWriter.WriteAsync( CreateEventEnvelope(sequence: sequence, MxEventFamily.OnDataChange)); @@ -499,10 +504,13 @@ public sealed class WorkerClientTests Assert.Equal(WorkerClientState.Faulted, client.State); // Reading the events channel after fault throws the propagated - // WorkerClientException carrying the rich diagnostic message. + // WorkerClientException carrying the rich diagnostic message. The + // drain is bounded by TestTimeout so a regression that leaves the + // channel uncompleted fails the test instead of hanging it. + using CancellationTokenSource drainTimeout = new(TestTimeout); WorkerClientException fault = await Assert.ThrowsAsync(async () => { - await foreach (WorkerEvent _ in client.ReadEventsAsync(CancellationToken.None)) + await foreach (WorkerEvent _ in client.ReadEventsAsync(drainTimeout.Token)) { } });