From f13f35bc79015277fa597dfc5d140255130c3822 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 18 May 2026 21:45:11 -0400 Subject: [PATCH] Resolve IntegrationTests-003..006 code-review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit IntegrationTests-003: the live MXAccess smoke test asserted on the first streamed event, which a registration/quality bootstrap event could occupy. The recording writer now waits for the first event matching a predicate (Family == OnDataChange). IntegrationTests-004: the cleanup `finally` could throw and mask an original assertion failure. Shutdown now routes through a helper that logs cleanup exceptions instead of propagating them. IntegrationTests-005: added live MXAccess parity tests — a Write round-trip to an advised item, and an invalid-handle command surfacing the MXAccess failure without a transport fault. IntegrationTests-006: added live LDAP failure-path tests — wrong password (no password leak), unknown username, and server-unreachable. docs/GatewayTesting.md updated to describe the new cases. Co-Authored-By: Claude Opus 4.7 (1M context) --- code-reviews/IntegrationTests/findings.md | 18 +- docs/GatewayTesting.md | 25 +- .../DashboardLdapLiveTests.cs | 63 ++++ .../WorkerLiveMxAccessSmokeTests.cs | 270 ++++++++++++++++-- 4 files changed, 343 insertions(+), 33 deletions(-) diff --git a/code-reviews/IntegrationTests/findings.md b/code-reviews/IntegrationTests/findings.md index 942c301..bb83ae8 100644 --- a/code-reviews/IntegrationTests/findings.md +++ b/code-reviews/IntegrationTests/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-18 | | Commit reviewed | `6c64030` | | Status | Reviewed | -| Open findings | 8 | +| Open findings | 4 | ## Checklist coverage @@ -63,13 +63,13 @@ | Severity | Medium | | Category | Correctness & logic bugs | | Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:89-97` | -| Status | Open | +| Status | Resolved | **Description:** The test asserts only on the first `MxEvent` recorded by `RecordingServerStreamWriter`. A live MXAccess provider can deliver an initial state/quality event whose family or handles differ from the expected `OnDataChange` (e.g. a registration-state or bad-quality bootstrap event). Because `WaitForFirstMessageAsync` returns whatever arrives first, a genuine ordering/family defect could fail spuriously or leave later wrong events unverified. **Recommendation:** Filter for the first event with `Family == OnDataChange` (with a bounded retry/poll) or assert the full recorded sequence, so the test verifies the event the worker is supposed to emit. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: Confirmed against source — `WaitForFirstMessageAsync` completed a `TaskCompletionSource` on the very first `WriteAsync`. Replaced it with `RecordingServerStreamWriter.WaitForMessageAsync(predicate, timeout)`, which scans recorded messages, skips earlier non-matching events, and blocks on a `SemaphoreSlim` until a matching one arrives or the timeout elapses (throwing a `TimeoutException` that reports the scanned count). `GatewaySession_WithLiveWorker_RegistersAdvisesStreamsDataAndCloses` now waits for the first `Family == OnDataChange` event. Live execution was not possible in this environment (no MXAccess COM); verified by build. ### IntegrationTests-004 @@ -78,13 +78,13 @@ | Severity | Medium | | Category | Error handling & resilience | | Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:108-111` | -| Status | Open | +| Status | Resolved | **Description:** In the `finally` block, after `CloseSessionAsync`, the test does `await streamTask.WaitAsync(StreamShutdownTimeout)`. If closing the session does not promptly complete the stream (or `StreamEvents` itself faults), this throws `TimeoutException` from inside `finally`, which replaces/masks any original assertion failure from the `try` block. The diagnostic value of the real failure is lost. **Recommendation:** Wrap the `streamTask.WaitAsync` (and ideally `WaitForProcessesAsync`) in a try/catch that logs the cleanup exception via `output.WriteLine` instead of letting it propagate. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: Confirmed — the `finally` block awaited `streamTask.WaitAsync` and `WaitForProcessesAsync` with no exception handling. Extracted a shared `ShutDownAsync` helper that wraps the session-close + stream-drain in one try/catch and the worker-process wait in a second try/catch, logging each cleanup exception via `output.WriteLine` instead of throwing. All three live tests now route shutdown through it, so a cleanup timeout can no longer mask an assertion failure. Live execution was not possible in this environment; verified by build. ### IntegrationTests-005 @@ -93,13 +93,13 @@ | Severity | Medium | | Category | Testing coverage | | Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs` | -| Status | Open | +| Status | Resolved | **Description:** The only live MXAccess test covers the Register→AddItem→Advise→one-OnDataChange→Close happy path. CLAUDE.md stresses that MXAccess parity is the contract and calls out non-obvious behaviors (`WriteSecured` ordering, `OperationComplete` semantics, invalid-handle exceptions). None of `Write`, `WriteSecured`, `Unadvise`, `RemoveItem`, `Unregister`, `OperationComplete`, an invalid-handle command, or a worker-fault path is exercised against live COM — exactly the paths fake-worker tests cannot validate. **Recommendation:** Add live coverage for at least a `Write` round-trip and an invalid-handle command, plus a worker-fault/abnormal-exit scenario, even if behind additional opt-in env vars. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: Added two `[LiveMxAccessFact]`-gated tests to `WorkerLiveMxAccessSmokeTests`. `GatewaySession_WithLiveWorker_WritesValueToAdvisedItem` registers/adds/advises then issues a `Write` of an integer value, asserting the command round-trips with `ProtocolStatusCode.Ok` and `MxCommandKind.Write`. `GatewaySession_WithLiveWorker_InvalidHandleCommand_SurfacesFailureWithoutTransportFault` issues `AddItem` against `int.MaxValue` as the server handle (never issued by MXAccess) and asserts the failure surfaces in the command reply without a usable item handle. Both reuse the existing opt-in env var and the `ShutDownAsync` cleanup helper. A worker-fault/abnormal-exit case was deliberately scoped out — it needs a controlled COM crash injection beyond what the existing harness supports; the two added cases cover the `Write` round-trip and invalid-handle paths the recommendation calls out. Live execution was not possible in this environment; verified by build. ### IntegrationTests-006 @@ -108,13 +108,13 @@ | Severity | Medium | | Category | Testing coverage | | Location | `src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs` | -| Status | Open | +| Status | Resolved | **Description:** LDAP live coverage is two cases: admin succeeds, readonly is denied for missing group. There is no coverage of a wrong password for a valid user, an unknown username, or the LDAP-server-unreachable path — all of which `DashboardAuthenticator` has distinct branches for (the `LdapException` catch, the `candidate is null` branch). The negative test only proves group-membership denial, not credential rejection. **Recommendation:** Add a live test for `admin` with a wrong password asserting `Succeeded == false` and that the password is not leaked into `FailureMessage`, and a test for an unknown username. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: Added three `[LiveLdapFact]`-gated tests to `DashboardLdapLiveTests`. `AuthenticateAsync_AdminWithWrongPassword_FailsWithoutLeakingPassword` exercises the `LdapException` catch via a rejected candidate bind and asserts the wrong password never reaches `FailureMessage`. `AuthenticateAsync_UnknownUsername_Fails` exercises the `candidate is null` branch. `AuthenticateAsync_ServerUnreachable_FailsWithoutThrowing` builds the authenticator with `LdapOptions.Port = 1` (a reserved port no LDAP server listens on) and asserts the connect failure is absorbed into a failed result rather than thrown — covering the generic `catch (Exception)` branch. All three are gated by the existing `MXGATEWAY_RUN_LIVE_LDAP_TESTS` opt-in so they stay opt-in. Live execution was not possible in this environment (no live LDAP); verified by build. ### IntegrationTests-007 diff --git a/docs/GatewayTesting.md b/docs/GatewayTesting.md index ad13863..1c53808 100644 --- a/docs/GatewayTesting.md +++ b/docs/GatewayTesting.md @@ -44,9 +44,22 @@ skipped unless `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1` is set because it creates the installed MXAccess COM object and depends on live provider state. The live smoke opens a gateway session, launches the x86 worker, runs -`Register`, `AddItem`, and `Advise`, waits a bounded time for one -`OnDataChange`, and closes the session in a `finally` block so the worker gets a -graceful shutdown request even when a command or event assertion fails. +`Register`, `AddItem`, and `Advise`, waits a bounded time for the first +`OnDataChange` event (skipping any earlier bootstrap/registration-state event), +and closes the session in a `finally` block so the worker gets a graceful +shutdown request even when a command or event assertion fails. Cleanup failures +in that `finally` block are logged rather than thrown, so a real assertion +failure is never masked by a shutdown timeout. + +`WorkerLiveMxAccessSmokeTests` additionally covers two MXAccess parity paths the +fake-worker tests cannot validate: + +- a `Write` round-trip against an advised item, and +- an `AddItem` against an invalid server handle, asserting the MXAccess failure + surfaces in the command reply without faulting the gateway transport. + +All three tests are gated by the same `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1` +opt-in variable. Build the worker before running the smoke: @@ -119,6 +132,12 @@ GLAuth has only the baseline groups, so this is a hard prerequisite beyond "LDAP is up." See the "Adding a gw-specific group" section of `glauth.md` for the provisioning step that adds `GwAdmin` and grants it to `admin`. +The suite covers both the success path and the `DashboardAuthenticator` failure +branches: `admin` in `GwAdmin` succeeds; `readonly` is denied for missing group; +`admin` with a wrong password is rejected by the candidate bind without leaking +the password into `FailureMessage`; an unknown username yields no candidate; and +an unreachable LDAP server is absorbed into a failed result rather than throwing. + Run the LDAP live tests explicitly: ```bash diff --git a/src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs b/src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs index a63e447..75bb32a 100644 --- a/src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs +++ b/src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs @@ -43,6 +43,69 @@ public sealed class DashboardLdapLiveTests Assert.DoesNotContain("readonly123", result.FailureMessage, StringComparison.Ordinal); } + [LiveLdapFact] + [Trait("Category", "LiveLdap")] + public async Task AuthenticateAsync_AdminWithWrongPassword_FailsWithoutLeakingPassword() + { + // Exercises the LdapException branch: the user exists and the service + // account search succeeds, but the candidate bind is rejected. + const string wrongPassword = "definitely-not-the-admin-password"; + DashboardAuthenticator authenticator = CreateAuthenticator(); + + DashboardAuthenticationResult result = await authenticator.AuthenticateAsync( + "admin", + wrongPassword, + CancellationToken.None); + + Assert.False(result.Succeeded); + Assert.Null(result.Principal); + Assert.DoesNotContain(wrongPassword, result.FailureMessage, StringComparison.Ordinal); + } + + [LiveLdapFact] + [Trait("Category", "LiveLdap")] + public async Task AuthenticateAsync_UnknownUsername_Fails() + { + // Exercises the `candidate is null` branch: the service-account search + // returns no entry, so no candidate bind is attempted. + DashboardAuthenticator authenticator = CreateAuthenticator(); + + DashboardAuthenticationResult result = await authenticator.AuthenticateAsync( + "no-such-user-9f3c1", + "irrelevant-password", + CancellationToken.None); + + Assert.False(result.Succeeded); + Assert.Null(result.Principal); + } + + [LiveLdapFact] + [Trait("Category", "LiveLdap")] + public async Task AuthenticateAsync_ServerUnreachable_FailsWithoutThrowing() + { + // Exercises the connect-failure path: a closed loopback port produces a + // connection error that DashboardAuthenticator must absorb into a Fail + // result rather than propagating an exception to the dashboard. + DashboardAuthenticator authenticator = new( + Options.Create(new GatewayOptions + { + Ldap = new LdapOptions + { + // 1 is a reserved port number that no LDAP server listens on. + Port = 1, + }, + }), + NullLogger.Instance); + + DashboardAuthenticationResult result = await authenticator.AuthenticateAsync( + "admin", + "admin123", + CancellationToken.None); + + Assert.False(result.Succeeded); + Assert.Null(result.Principal); + } + private static DashboardAuthenticator CreateAuthenticator() { return new DashboardAuthenticator( diff --git a/src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs b/src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs index a20fa94..f590fba 100644 --- a/src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs +++ b/src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs @@ -86,8 +86,15 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output) LogReply("Advise", adviseReply); Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); + // A live MXAccess provider can deliver an initial registration-state + // or bad-quality bootstrap event before the OnDataChange the worker + // is contracted to emit. Match on the family rather than trusting + // whatever event arrives first so a genuine ordering defect cannot + // pass spuriously or leave a later wrong event unverified. MxEvent dataChange = await eventWriter - .WaitForFirstMessageAsync(IntegrationTestEnvironment.LiveMxAccessEventTimeout) + .WaitForMessageAsync( + candidate => candidate.Family == MxEventFamily.OnDataChange, + IntegrationTestEnvironment.LiveMxAccessEventTimeout) .ConfigureAwait(false); LogEvent(dataChange); @@ -98,22 +105,184 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output) } finally { - try - { - if (!string.IsNullOrWhiteSpace(sessionId)) - { - await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false); - } + await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false); + } + } - if (streamTask is not null) + /// + /// Verifies that a Write command round-trips through live MXAccess against an advised item. + /// + [LiveMxAccessFact] + [Trait("Category", "LiveMxAccess")] + public async Task GatewaySession_WithLiveWorker_WritesValueToAdvisedItem() + { + string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath(); + Assert.True( + File.Exists(workerExecutablePath), + $"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}."); + + TestWorkerProcessFactory processFactory = new(output); + await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output); + + string? sessionId = null; + Task? streamTask = null; + + try + { + OpenSessionReply openReply = await fixture.Service.OpenSession( + new OpenSessionRequest { - await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false); - } - } - finally + ClientSessionName = "live-mxaccess-write", + ClientCorrelationId = "live-open-write", + CommandTimeout = Duration.FromTimeSpan(CommandTimeout), + }, + new TestServerCallContext()).ConfigureAwait(false); + + sessionId = openReply.SessionId; + Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); + + RecordingServerStreamWriter eventWriter = new(); + streamTask = fixture.Service.StreamEvents( + new StreamEventsRequest { SessionId = sessionId }, + eventWriter, + new TestServerCallContext()); + + MxCommandReply registerReply = await fixture.Service.Invoke( + CreateRegisterRequest(sessionId), + new TestServerCallContext()).ConfigureAwait(false); + LogReply("Register", registerReply); + Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); + + MxCommandReply addItemReply = await fixture.Service.Invoke( + CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle), + new TestServerCallContext()).ConfigureAwait(false); + LogReply("AddItem", addItemReply); + Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); + Assert.True(addItemReply.AddItem.ItemHandle > 0); + + MxCommandReply adviseReply = await fixture.Service.Invoke( + CreateAdviseRequest( + sessionId, + registerReply.Register.ServerHandle, + addItemReply.AddItem.ItemHandle), + new TestServerCallContext()).ConfigureAwait(false); + LogReply("Advise", adviseReply); + Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); + + MxCommandReply writeReply = await fixture.Service.Invoke( + CreateWriteRequest( + sessionId, + registerReply.Register.ServerHandle, + addItemReply.AddItem.ItemHandle), + new TestServerCallContext()).ConfigureAwait(false); + LogReply("Write", writeReply); + + // The gateway must always report a protocol-level status. MXAccess + // parity details (a write rejection, a secured-item failure) belong + // in hresult / statuses, not in a transport failure — the command + // itself completed its round-trip to the worker and back. + Assert.Equal(ProtocolStatusCode.Ok, writeReply.ProtocolStatus.Code); + Assert.Equal(MxCommandKind.Write, writeReply.Kind); + } + finally + { + await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false); + } + } + + /// + /// Verifies that an AddItem against an invalid server handle surfaces the MXAccess failure + /// without faulting the gateway transport, exercising the invalid-handle parity path. + /// + [LiveMxAccessFact] + [Trait("Category", "LiveMxAccess")] + public async Task GatewaySession_WithLiveWorker_InvalidHandleCommand_SurfacesFailureWithoutTransportFault() + { + string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath(); + Assert.True( + File.Exists(workerExecutablePath), + $"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}."); + + TestWorkerProcessFactory processFactory = new(output); + await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output); + + string? sessionId = null; + + try + { + OpenSessionReply openReply = await fixture.Service.OpenSession( + new OpenSessionRequest + { + ClientSessionName = "live-mxaccess-invalid-handle", + ClientCorrelationId = "live-open-invalid", + CommandTimeout = Duration.FromTimeSpan(CommandTimeout), + }, + new TestServerCallContext()).ConfigureAwait(false); + + sessionId = openReply.SessionId; + Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); + + // Deliberately skip Register: server handle 0x7FFFFFFF was never + // issued by MXAccess. The worker must invoke COM and relay the + // invalid-handle failure rather than the gateway short-circuiting. + MxCommandReply addItemReply = await fixture.Service.Invoke( + CreateAddItemRequest(sessionId, serverHandle: int.MaxValue), + new TestServerCallContext()).ConfigureAwait(false); + LogReply("AddItem(invalid-handle)", addItemReply); + + // MXAccess parity: an invalid handle is an MXAccess-level failure. + // The command still completed its worker round-trip, so the gateway + // protocol status is Ok and the failure shows up in hresult / the + // status proxies — it must not be reported as a transport fault. + Assert.NotEqual(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); + Assert.True( + addItemReply.AddItem is null || addItemReply.AddItem.ItemHandle <= 0, + "Invalid-handle AddItem must not yield a usable item handle."); + } + finally + { + await ShutDownAsync(fixture, processFactory, sessionId, streamTask: null).ConfigureAwait(false); + } + } + + /// + /// Closes the session and drains the event stream / worker processes without letting a + /// cleanup timeout mask the original failure from the test body. + /// + private async Task ShutDownAsync( + GatewayServiceFixture fixture, + TestWorkerProcessFactory processFactory, + string? sessionId, + Task? streamTask) + { + try + { + if (!string.IsNullOrWhiteSpace(sessionId)) { - await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false); + await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false); } + + if (streamTask is not null) + { + await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false); + } + } + catch (Exception ex) + { + // Cleanup runs in a finally block. A TimeoutException (or a faulted + // StreamEvents task) here would otherwise replace any assertion + // failure raised in the try block. Log it and let the original + // failure surface. + output.WriteLine($"Cleanup error during session/stream shutdown: {ex}"); + } + + try + { + await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false); + } + catch (Exception ex) + { + output.WriteLine($"Cleanup error while waiting for worker processes to exit: {ex}"); } } @@ -175,6 +344,32 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output) }; } + private static MxCommandRequest CreateWriteRequest( + string sessionId, + int serverHandle, + int itemHandle) + { + return new MxCommandRequest + { + SessionId = sessionId, + ClientCorrelationId = "live-write", + Command = new MxCommand + { + Kind = MxCommandKind.Write, + Write = new WriteCommand + { + ServerHandle = serverHandle, + ItemHandle = itemHandle, + Value = new MxValue + { + DataType = MxDataType.Integer, + Int32Value = 1, + }, + }, + }, + }; + } + private async Task CloseSessionAsync( GatewayServiceFixture fixture, string sessionId) @@ -321,8 +516,8 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output) private sealed class RecordingServerStreamWriter : IServerStreamWriter { private readonly object syncRoot = new(); - private readonly TaskCompletionSource firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously); private readonly List messages = []; + private readonly SemaphoreSlim messageArrived = new(0); /// /// All messages that have been written to the stream. @@ -344,7 +539,7 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output) public WriteOptions? WriteOptions { get; set; } /// - /// Records the message and completes the first-message task. + /// Records the message and signals any pending waiter. /// /// The message to write. public Task WriteAsync(T message) @@ -354,18 +549,51 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output) messages.Add(message); } - firstMessage.TrySetResult(message); + messageArrived.Release(); return Task.CompletedTask; } /// - /// Waits for the first message up to the specified timeout. + /// Waits for the first recorded message that satisfies , + /// up to the specified timeout. Earlier non-matching messages (for example a + /// registration-state bootstrap event) are skipped rather than treated as the result. /// - /// The maximum time to wait. - /// The first message written to the stream. - public async Task WaitForFirstMessageAsync(TimeSpan timeout) + /// Filter the awaited message must satisfy. + /// The maximum total time to wait. + /// The first message that satisfies the predicate. + public async Task WaitForMessageAsync( + Func predicate, + TimeSpan timeout) { - return await firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false); + using CancellationTokenSource timeoutCancellation = new(timeout); + int scanned = 0; + + while (true) + { + T[] snapshot; + lock (syncRoot) + { + snapshot = messages.ToArray(); + } + + for (; scanned < snapshot.Length; scanned++) + { + if (predicate(snapshot[scanned])) + { + return snapshot[scanned]; + } + } + + try + { + await messageArrived.WaitAsync(timeoutCancellation.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (timeoutCancellation.IsCancellationRequested) + { + throw new TimeoutException( + $"No stream message satisfied the predicate within {timeout}. Recorded {scanned} message(s)."); + } + } } }