Resolve IntegrationTests-003..006 code-review findings
IntegrationTests-003: the live MXAccess smoke test asserted on the first streamed event, which a registration/quality bootstrap event could occupy. The recording writer now waits for the first event matching a predicate (Family == OnDataChange). IntegrationTests-004: the cleanup `finally` could throw and mask an original assertion failure. Shutdown now routes through a helper that logs cleanup exceptions instead of propagating them. IntegrationTests-005: added live MXAccess parity tests — a Write round-trip to an advised item, and an invalid-handle command surfacing the MXAccess failure without a transport fault. IntegrationTests-006: added live LDAP failure-path tests — wrong password (no password leak), unknown username, and server-unreachable. docs/GatewayTesting.md updated to describe the new cases. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,7 +7,7 @@
|
||||
| Review date | 2026-05-18 |
|
||||
| Commit reviewed | `6c64030` |
|
||||
| Status | Reviewed |
|
||||
| Open findings | 8 |
|
||||
| Open findings | 4 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
@@ -63,13 +63,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Correctness & logic bugs |
|
||||
| Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:89-97` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** The test asserts only on the first `MxEvent` recorded by `RecordingServerStreamWriter`. A live MXAccess provider can deliver an initial state/quality event whose family or handles differ from the expected `OnDataChange` (e.g. a registration-state or bad-quality bootstrap event). Because `WaitForFirstMessageAsync` returns whatever arrives first, a genuine ordering/family defect could fail spuriously or leave later wrong events unverified.
|
||||
|
||||
**Recommendation:** Filter for the first event with `Family == OnDataChange` (with a bounded retry/poll) or assert the full recorded sequence, so the test verifies the event the worker is supposed to emit.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-18: Confirmed against source — `WaitForFirstMessageAsync` completed a `TaskCompletionSource` on the very first `WriteAsync`. Replaced it with `RecordingServerStreamWriter.WaitForMessageAsync(predicate, timeout)`, which scans recorded messages, skips earlier non-matching events, and blocks on a `SemaphoreSlim` until a matching one arrives or the timeout elapses (throwing a `TimeoutException` that reports the scanned count). `GatewaySession_WithLiveWorker_RegistersAdvisesStreamsDataAndCloses` now waits for the first `Family == OnDataChange` event. Live execution was not possible in this environment (no MXAccess COM); verified by build.
|
||||
|
||||
### IntegrationTests-004
|
||||
|
||||
@@ -78,13 +78,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Error handling & resilience |
|
||||
| Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs:108-111` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** In the `finally` block, after `CloseSessionAsync`, the test does `await streamTask.WaitAsync(StreamShutdownTimeout)`. If closing the session does not promptly complete the stream (or `StreamEvents` itself faults), this throws `TimeoutException` from inside `finally`, which replaces/masks any original assertion failure from the `try` block. The diagnostic value of the real failure is lost.
|
||||
|
||||
**Recommendation:** Wrap the `streamTask.WaitAsync` (and ideally `WaitForProcessesAsync`) in a try/catch that logs the cleanup exception via `output.WriteLine` instead of letting it propagate.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-18: Confirmed — the `finally` block awaited `streamTask.WaitAsync` and `WaitForProcessesAsync` with no exception handling. Extracted a shared `ShutDownAsync` helper that wraps the session-close + stream-drain in one try/catch and the worker-process wait in a second try/catch, logging each cleanup exception via `output.WriteLine` instead of throwing. All three live tests now route shutdown through it, so a cleanup timeout can no longer mask an assertion failure. Live execution was not possible in this environment; verified by build.
|
||||
|
||||
### IntegrationTests-005
|
||||
|
||||
@@ -93,13 +93,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Testing coverage |
|
||||
| Location | `src/MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** The only live MXAccess test covers the Register→AddItem→Advise→one-OnDataChange→Close happy path. CLAUDE.md stresses that MXAccess parity is the contract and calls out non-obvious behaviors (`WriteSecured` ordering, `OperationComplete` semantics, invalid-handle exceptions). None of `Write`, `WriteSecured`, `Unadvise`, `RemoveItem`, `Unregister`, `OperationComplete`, an invalid-handle command, or a worker-fault path is exercised against live COM — exactly the paths fake-worker tests cannot validate.
|
||||
|
||||
**Recommendation:** Add live coverage for at least a `Write` round-trip and an invalid-handle command, plus a worker-fault/abnormal-exit scenario, even if behind additional opt-in env vars.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-18: Added two `[LiveMxAccessFact]`-gated tests to `WorkerLiveMxAccessSmokeTests`. `GatewaySession_WithLiveWorker_WritesValueToAdvisedItem` registers/adds/advises then issues a `Write` of an integer value, asserting the command round-trips with `ProtocolStatusCode.Ok` and `MxCommandKind.Write`. `GatewaySession_WithLiveWorker_InvalidHandleCommand_SurfacesFailureWithoutTransportFault` issues `AddItem` against `int.MaxValue` as the server handle (never issued by MXAccess) and asserts the failure surfaces in the command reply without a usable item handle. Both reuse the existing opt-in env var and the `ShutDownAsync` cleanup helper. A worker-fault/abnormal-exit case was deliberately scoped out — it needs a controlled COM crash injection beyond what the existing harness supports; the two added cases cover the `Write` round-trip and invalid-handle paths the recommendation calls out. Live execution was not possible in this environment; verified by build.
|
||||
|
||||
### IntegrationTests-006
|
||||
|
||||
@@ -108,13 +108,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Testing coverage |
|
||||
| Location | `src/MxGateway.IntegrationTests/DashboardLdapLiveTests.cs` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** LDAP live coverage is two cases: admin succeeds, readonly is denied for missing group. There is no coverage of a wrong password for a valid user, an unknown username, or the LDAP-server-unreachable path — all of which `DashboardAuthenticator` has distinct branches for (the `LdapException` catch, the `candidate is null` branch). The negative test only proves group-membership denial, not credential rejection.
|
||||
|
||||
**Recommendation:** Add a live test for `admin` with a wrong password asserting `Succeeded == false` and that the password is not leaked into `FailureMessage`, and a test for an unknown username.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-18: Added three `[LiveLdapFact]`-gated tests to `DashboardLdapLiveTests`. `AuthenticateAsync_AdminWithWrongPassword_FailsWithoutLeakingPassword` exercises the `LdapException` catch via a rejected candidate bind and asserts the wrong password never reaches `FailureMessage`. `AuthenticateAsync_UnknownUsername_Fails` exercises the `candidate is null` branch. `AuthenticateAsync_ServerUnreachable_FailsWithoutThrowing` builds the authenticator with `LdapOptions.Port = 1` (a reserved port no LDAP server listens on) and asserts the connect failure is absorbed into a failed result rather than thrown — covering the generic `catch (Exception)` branch. All three are gated by the existing `MXGATEWAY_RUN_LIVE_LDAP_TESTS` opt-in so they stay opt-in. Live execution was not possible in this environment (no live LDAP); verified by build.
|
||||
|
||||
### IntegrationTests-007
|
||||
|
||||
|
||||
+22
-3
@@ -44,9 +44,22 @@ skipped unless `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1` is set because it creates
|
||||
the installed MXAccess COM object and depends on live provider state.
|
||||
|
||||
The live smoke opens a gateway session, launches the x86 worker, runs
|
||||
`Register`, `AddItem`, and `Advise`, waits a bounded time for one
|
||||
`OnDataChange`, and closes the session in a `finally` block so the worker gets a
|
||||
graceful shutdown request even when a command or event assertion fails.
|
||||
`Register`, `AddItem`, and `Advise`, waits a bounded time for the first
|
||||
`OnDataChange` event (skipping any earlier bootstrap/registration-state event),
|
||||
and closes the session in a `finally` block so the worker gets a graceful
|
||||
shutdown request even when a command or event assertion fails. Cleanup failures
|
||||
in that `finally` block are logged rather than thrown, so a real assertion
|
||||
failure is never masked by a shutdown timeout.
|
||||
|
||||
`WorkerLiveMxAccessSmokeTests` additionally covers two MXAccess parity paths the
|
||||
fake-worker tests cannot validate:
|
||||
|
||||
- a `Write` round-trip against an advised item, and
|
||||
- an `AddItem` against an invalid server handle, asserting the MXAccess failure
|
||||
surfaces in the command reply without faulting the gateway transport.
|
||||
|
||||
All three tests are gated by the same `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1`
|
||||
opt-in variable.
|
||||
|
||||
Build the worker before running the smoke:
|
||||
|
||||
@@ -119,6 +132,12 @@ GLAuth has only the baseline groups, so this is a hard prerequisite beyond "LDAP
|
||||
is up." See the "Adding a gw-specific group" section of `glauth.md` for the
|
||||
provisioning step that adds `GwAdmin` and grants it to `admin`.
|
||||
|
||||
The suite covers both the success path and the `DashboardAuthenticator` failure
|
||||
branches: `admin` in `GwAdmin` succeeds; `readonly` is denied for missing group;
|
||||
`admin` with a wrong password is rejected by the candidate bind without leaking
|
||||
the password into `FailureMessage`; an unknown username yields no candidate; and
|
||||
an unreachable LDAP server is absorbed into a failed result rather than throwing.
|
||||
|
||||
Run the LDAP live tests explicitly:
|
||||
|
||||
```bash
|
||||
|
||||
@@ -43,6 +43,69 @@ public sealed class DashboardLdapLiveTests
|
||||
Assert.DoesNotContain("readonly123", result.FailureMessage, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
[LiveLdapFact]
|
||||
[Trait("Category", "LiveLdap")]
|
||||
public async Task AuthenticateAsync_AdminWithWrongPassword_FailsWithoutLeakingPassword()
|
||||
{
|
||||
// Exercises the LdapException branch: the user exists and the service
|
||||
// account search succeeds, but the candidate bind is rejected.
|
||||
const string wrongPassword = "definitely-not-the-admin-password";
|
||||
DashboardAuthenticator authenticator = CreateAuthenticator();
|
||||
|
||||
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
|
||||
"admin",
|
||||
wrongPassword,
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.False(result.Succeeded);
|
||||
Assert.Null(result.Principal);
|
||||
Assert.DoesNotContain(wrongPassword, result.FailureMessage, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
[LiveLdapFact]
|
||||
[Trait("Category", "LiveLdap")]
|
||||
public async Task AuthenticateAsync_UnknownUsername_Fails()
|
||||
{
|
||||
// Exercises the `candidate is null` branch: the service-account search
|
||||
// returns no entry, so no candidate bind is attempted.
|
||||
DashboardAuthenticator authenticator = CreateAuthenticator();
|
||||
|
||||
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
|
||||
"no-such-user-9f3c1",
|
||||
"irrelevant-password",
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.False(result.Succeeded);
|
||||
Assert.Null(result.Principal);
|
||||
}
|
||||
|
||||
[LiveLdapFact]
|
||||
[Trait("Category", "LiveLdap")]
|
||||
public async Task AuthenticateAsync_ServerUnreachable_FailsWithoutThrowing()
|
||||
{
|
||||
// Exercises the connect-failure path: a closed loopback port produces a
|
||||
// connection error that DashboardAuthenticator must absorb into a Fail
|
||||
// result rather than propagating an exception to the dashboard.
|
||||
DashboardAuthenticator authenticator = new(
|
||||
Options.Create(new GatewayOptions
|
||||
{
|
||||
Ldap = new LdapOptions
|
||||
{
|
||||
// 1 is a reserved port number that no LDAP server listens on.
|
||||
Port = 1,
|
||||
},
|
||||
}),
|
||||
NullLogger<DashboardAuthenticator>.Instance);
|
||||
|
||||
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
|
||||
"admin",
|
||||
"admin123",
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.False(result.Succeeded);
|
||||
Assert.Null(result.Principal);
|
||||
}
|
||||
|
||||
private static DashboardAuthenticator CreateAuthenticator()
|
||||
{
|
||||
return new DashboardAuthenticator(
|
||||
|
||||
@@ -86,8 +86,15 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
LogReply("Advise", adviseReply);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
|
||||
|
||||
// A live MXAccess provider can deliver an initial registration-state
|
||||
// or bad-quality bootstrap event before the OnDataChange the worker
|
||||
// is contracted to emit. Match on the family rather than trusting
|
||||
// whatever event arrives first so a genuine ordering defect cannot
|
||||
// pass spuriously or leave a later wrong event unverified.
|
||||
MxEvent dataChange = await eventWriter
|
||||
.WaitForFirstMessageAsync(IntegrationTestEnvironment.LiveMxAccessEventTimeout)
|
||||
.WaitForMessageAsync(
|
||||
candidate => candidate.Family == MxEventFamily.OnDataChange,
|
||||
IntegrationTestEnvironment.LiveMxAccessEventTimeout)
|
||||
.ConfigureAwait(false);
|
||||
LogEvent(dataChange);
|
||||
|
||||
@@ -98,22 +105,184 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
}
|
||||
finally
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(sessionId))
|
||||
{
|
||||
await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false);
|
||||
}
|
||||
await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
if (streamTask is not null)
|
||||
/// <summary>
|
||||
/// Verifies that a Write command round-trips through live MXAccess against an advised item.
|
||||
/// </summary>
|
||||
[LiveMxAccessFact]
|
||||
[Trait("Category", "LiveMxAccess")]
|
||||
public async Task GatewaySession_WithLiveWorker_WritesValueToAdvisedItem()
|
||||
{
|
||||
string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath();
|
||||
Assert.True(
|
||||
File.Exists(workerExecutablePath),
|
||||
$"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}.");
|
||||
|
||||
TestWorkerProcessFactory processFactory = new(output);
|
||||
await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output);
|
||||
|
||||
string? sessionId = null;
|
||||
Task? streamTask = null;
|
||||
|
||||
try
|
||||
{
|
||||
OpenSessionReply openReply = await fixture.Service.OpenSession(
|
||||
new OpenSessionRequest
|
||||
{
|
||||
await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
ClientSessionName = "live-mxaccess-write",
|
||||
ClientCorrelationId = "live-open-write",
|
||||
CommandTimeout = Duration.FromTimeSpan(CommandTimeout),
|
||||
},
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
|
||||
sessionId = openReply.SessionId;
|
||||
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
|
||||
|
||||
RecordingServerStreamWriter<MxEvent> eventWriter = new();
|
||||
streamTask = fixture.Service.StreamEvents(
|
||||
new StreamEventsRequest { SessionId = sessionId },
|
||||
eventWriter,
|
||||
new TestServerCallContext());
|
||||
|
||||
MxCommandReply registerReply = await fixture.Service.Invoke(
|
||||
CreateRegisterRequest(sessionId),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("Register", registerReply);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
|
||||
|
||||
MxCommandReply addItemReply = await fixture.Service.Invoke(
|
||||
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("AddItem", addItemReply);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
|
||||
Assert.True(addItemReply.AddItem.ItemHandle > 0);
|
||||
|
||||
MxCommandReply adviseReply = await fixture.Service.Invoke(
|
||||
CreateAdviseRequest(
|
||||
sessionId,
|
||||
registerReply.Register.ServerHandle,
|
||||
addItemReply.AddItem.ItemHandle),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("Advise", adviseReply);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
|
||||
|
||||
MxCommandReply writeReply = await fixture.Service.Invoke(
|
||||
CreateWriteRequest(
|
||||
sessionId,
|
||||
registerReply.Register.ServerHandle,
|
||||
addItemReply.AddItem.ItemHandle),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("Write", writeReply);
|
||||
|
||||
// The gateway must always report a protocol-level status. MXAccess
|
||||
// parity details (a write rejection, a secured-item failure) belong
|
||||
// in hresult / statuses, not in a transport failure — the command
|
||||
// itself completed its round-trip to the worker and back.
|
||||
Assert.Equal(ProtocolStatusCode.Ok, writeReply.ProtocolStatus.Code);
|
||||
Assert.Equal(MxCommandKind.Write, writeReply.Kind);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that an AddItem against an invalid server handle surfaces the MXAccess failure
|
||||
/// without faulting the gateway transport, exercising the invalid-handle parity path.
|
||||
/// </summary>
|
||||
[LiveMxAccessFact]
|
||||
[Trait("Category", "LiveMxAccess")]
|
||||
public async Task GatewaySession_WithLiveWorker_InvalidHandleCommand_SurfacesFailureWithoutTransportFault()
|
||||
{
|
||||
string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath();
|
||||
Assert.True(
|
||||
File.Exists(workerExecutablePath),
|
||||
$"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}.");
|
||||
|
||||
TestWorkerProcessFactory processFactory = new(output);
|
||||
await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output);
|
||||
|
||||
string? sessionId = null;
|
||||
|
||||
try
|
||||
{
|
||||
OpenSessionReply openReply = await fixture.Service.OpenSession(
|
||||
new OpenSessionRequest
|
||||
{
|
||||
ClientSessionName = "live-mxaccess-invalid-handle",
|
||||
ClientCorrelationId = "live-open-invalid",
|
||||
CommandTimeout = Duration.FromTimeSpan(CommandTimeout),
|
||||
},
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
|
||||
sessionId = openReply.SessionId;
|
||||
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
|
||||
|
||||
// Deliberately skip Register: server handle 0x7FFFFFFF was never
|
||||
// issued by MXAccess. The worker must invoke COM and relay the
|
||||
// invalid-handle failure rather than the gateway short-circuiting.
|
||||
MxCommandReply addItemReply = await fixture.Service.Invoke(
|
||||
CreateAddItemRequest(sessionId, serverHandle: int.MaxValue),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("AddItem(invalid-handle)", addItemReply);
|
||||
|
||||
// MXAccess parity: an invalid handle is an MXAccess-level failure.
|
||||
// The command still completed its worker round-trip, so the gateway
|
||||
// protocol status is Ok and the failure shows up in hresult / the
|
||||
// status proxies — it must not be reported as a transport fault.
|
||||
Assert.NotEqual(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
|
||||
Assert.True(
|
||||
addItemReply.AddItem is null || addItemReply.AddItem.ItemHandle <= 0,
|
||||
"Invalid-handle AddItem must not yield a usable item handle.");
|
||||
}
|
||||
finally
|
||||
{
|
||||
await ShutDownAsync(fixture, processFactory, sessionId, streamTask: null).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Closes the session and drains the event stream / worker processes without letting a
|
||||
/// cleanup timeout mask the original failure from the test body.
|
||||
/// </summary>
|
||||
private async Task ShutDownAsync(
|
||||
GatewayServiceFixture fixture,
|
||||
TestWorkerProcessFactory processFactory,
|
||||
string? sessionId,
|
||||
Task? streamTask)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(sessionId))
|
||||
{
|
||||
await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||
await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (streamTask is not null)
|
||||
{
|
||||
await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Cleanup runs in a finally block. A TimeoutException (or a faulted
|
||||
// StreamEvents task) here would otherwise replace any assertion
|
||||
// failure raised in the try block. Log it and let the original
|
||||
// failure surface.
|
||||
output.WriteLine($"Cleanup error during session/stream shutdown: {ex}");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
output.WriteLine($"Cleanup error while waiting for worker processes to exit: {ex}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,6 +344,32 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
};
|
||||
}
|
||||
|
||||
private static MxCommandRequest CreateWriteRequest(
|
||||
string sessionId,
|
||||
int serverHandle,
|
||||
int itemHandle)
|
||||
{
|
||||
return new MxCommandRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
ClientCorrelationId = "live-write",
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Write,
|
||||
Write = new WriteCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemHandle = itemHandle,
|
||||
Value = new MxValue
|
||||
{
|
||||
DataType = MxDataType.Integer,
|
||||
Int32Value = 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private async Task CloseSessionAsync(
|
||||
GatewayServiceFixture fixture,
|
||||
string sessionId)
|
||||
@@ -321,8 +516,8 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
private sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
|
||||
{
|
||||
private readonly object syncRoot = new();
|
||||
private readonly TaskCompletionSource<T> firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private readonly List<T> messages = [];
|
||||
private readonly SemaphoreSlim messageArrived = new(0);
|
||||
|
||||
/// <summary>
|
||||
/// All messages that have been written to the stream.
|
||||
@@ -344,7 +539,7 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
public WriteOptions? WriteOptions { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Records the message and completes the first-message task.
|
||||
/// Records the message and signals any pending waiter.
|
||||
/// </summary>
|
||||
/// <param name="message">The message to write.</param>
|
||||
public Task WriteAsync(T message)
|
||||
@@ -354,18 +549,51 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
messages.Add(message);
|
||||
}
|
||||
|
||||
firstMessage.TrySetResult(message);
|
||||
messageArrived.Release();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Waits for the first message up to the specified timeout.
|
||||
/// Waits for the first recorded message that satisfies <paramref name="predicate"/>,
|
||||
/// up to the specified timeout. Earlier non-matching messages (for example a
|
||||
/// registration-state bootstrap event) are skipped rather than treated as the result.
|
||||
/// </summary>
|
||||
/// <param name="timeout">The maximum time to wait.</param>
|
||||
/// <returns>The first message written to the stream.</returns>
|
||||
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout)
|
||||
/// <param name="predicate">Filter the awaited message must satisfy.</param>
|
||||
/// <param name="timeout">The maximum total time to wait.</param>
|
||||
/// <returns>The first message that satisfies the predicate.</returns>
|
||||
public async Task<T> WaitForMessageAsync(
|
||||
Func<T, bool> predicate,
|
||||
TimeSpan timeout)
|
||||
{
|
||||
return await firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
|
||||
using CancellationTokenSource timeoutCancellation = new(timeout);
|
||||
int scanned = 0;
|
||||
|
||||
while (true)
|
||||
{
|
||||
T[] snapshot;
|
||||
lock (syncRoot)
|
||||
{
|
||||
snapshot = messages.ToArray();
|
||||
}
|
||||
|
||||
for (; scanned < snapshot.Length; scanned++)
|
||||
{
|
||||
if (predicate(snapshot[scanned]))
|
||||
{
|
||||
return snapshot[scanned];
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await messageArrived.WaitAsync(timeoutCancellation.Token).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException) when (timeoutCancellation.IsCancellationRequested)
|
||||
{
|
||||
throw new TimeoutException(
|
||||
$"No stream message satisfied the predicate within {timeout}. Recorded {scanned} message(s).");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user