Resolve Tests-007..012 code-review findings

Tests-007: TestServerCallContext and stream-writer/constraint helpers were
copy-pasted across five test files. Consolidated into a shared
MxGateway.Tests.TestSupport namespace; duplicates deleted.

Tests-008: renamed snake_case alarm-test methods to PascalCase
Method_Condition_Result and dropped redundant usings. Re-triaged two
inaccurate sub-claims (the "wnwrap" name and a required CompilerServices
using).

Tests-009: corrected three copy-paste-mismatched XML <summary> comments in
SessionManagerTests.

Tests-010: added the missing anonymous-localhost security negatives —
bypass disallowed, and loopback-allowed from a remote address.

Tests-011: SessionWorkerClientFactoryFakeWorkerTests discarded worker tasks.
The test class now tracks each launcher and observes its task in DisposeAsync.

Tests-012: added xunit.runner.json pinning collection parallelism and
documented the ephemeral-port convention.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-18 22:59:01 -04:00
parent bd3096533d
commit 9582de077b
17 changed files with 387 additions and 577 deletions
+15 -13
View File
@@ -7,7 +7,7 @@
| Review date | 2026-05-18 |
| Commit reviewed | `6c64030` |
| Status | Reviewed |
| Open findings | 6 |
| Open findings | 0 |
## Checklist coverage
@@ -127,13 +127,13 @@
| Severity | Low |
| Category | Code organization & conventions |
| Location | `src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs:682`, `src/MxGateway.Tests/Gateway/Grpc/GalaxyRepositoryGrpcServiceTests.cs:324`, `src/MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs:460`, `src/MxGateway.Tests/Security/Authorization/GatewayGrpcAuthorizationInterceptorTests.cs:233` |
| Status | Open |
| Status | Resolved |
**Description:** A near-identical `TestServerCallContext` implementation is copy-pasted into at least four test files (and `AllowAllConstraintEnforcer` / `TestServerStreamWriter` / `RecordingStreamWriter` into several). Duplication risks the copies drifting and bloats each file.
**Recommendation:** Extract a shared `TestServerCallContext`, `RecordingServerStreamWriter<T>`, and `AllowAllConstraintEnforcer` into a common test-support folder/namespace.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: confirmed five duplicated copies (the brief's four plus a fifth in `Galaxy/GalaxyFilterInputSafetyTests.cs`). Added a shared `MxGateway.Tests.TestSupport` namespace under `src/MxGateway.Tests/TestSupport/`: `TestServerCallContext.cs` (single class with an optional `Metadata? requestHeaders` constructor parameter that subsumes both the no-arg and headers-bearing variants), `RecordingServerStreamWriter.cs` (thread-safe writer with `Messages` and `WaitForFirstMessageAsync`, replacing `TestServerStreamWriter`/`RecordingStreamWriter`/`RecordingServerStreamWriter`), and `AllowAllConstraintEnforcer.cs`. Deleted all five `TestServerCallContext` copies, both `AllowAllConstraintEnforcer` copies, and the three stream-writer copies; updated the five test files to `using MxGateway.Tests.TestSupport;` and renamed `.Items` call sites to `.Messages`. Removed the now-unused `Grpc.Core` using from `GatewayEndToEndFakeWorkerSmokeTests.cs`. Build clean (0 warnings) and suite green.
### Tests-008
@@ -142,13 +142,15 @@
| Severity | Low |
| Category | mxaccessgw conventions |
| Location | `src/MxGateway.Tests/Gateway/Sessions/WorkerAlarmRpcDispatcherTests.cs:1-9`, `src/MxGateway.Tests/Gateway/Sessions/NotWiredAlarmRpcDispatcherTests.cs:1-3`, `src/MxGateway.Tests/Gateway/Sessions/SessionManagerAlarmAutoSubscribeTests.cs:1` |
| Status | Open |
| Status | Resolved |
**Description:** The alarm test files diverge from the project's C# style and the rest of the suite: snake_case test method names instead of the PascalCase `Method_Condition_Result` pattern; redundant explicit `using System;`/`System.Threading;` imports despite implicit global usings; and explicit-type `new` instead of target-typed `new()` used elsewhere. There is also a typo in fixture data (`"wnwrap subscribe failed"`).
**Recommendation:** Rename the alarm tests to the house `Method_Condition_Result` convention, drop redundant `System.*` usings, align `new` usage, and fix the `wnwrap` typo.
**Resolution:** _(open)_
**Re-triage note:** Two of the finding's claims are incorrect. (1) `"wnwrap subscribe failed"` is **not a typo**`WnWrap` is the real name of the worker's `WnWrapAlarmConsumer` MXAccess component (`src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs`); the fixture string deliberately references it, so it was left unchanged. (2) `SessionManagerAlarmAutoSubscribeTests.cs` already uses PascalCase `Method_Condition_Result` names and target-typed `new()`, and its lone `using System.Runtime.CompilerServices;` is **required** for `[EnumeratorCancellation]` (not a global using) — it is not redundant. That file needed no change. The genuine style drift was confined to `WorkerAlarmRpcDispatcherTests.cs` and `NotWiredAlarmRpcDispatcherTests.cs`.
**Resolution:** Resolved 2026-05-18: renamed all ten `WorkerAlarmRpcDispatcherTests` methods and both `NotWiredAlarmRpcDispatcherTests` methods from snake_case to the house `Method_Condition_Result` PascalCase convention; dropped the redundant `System`/`System.Collections.Generic`/`System.Linq`/`System.Threading`/`System.Threading.Tasks` usings from `WorkerAlarmRpcDispatcherTests.cs` and `System.Threading`/`System.Threading.Tasks` from `NotWiredAlarmRpcDispatcherTests.cs` (all are implicit global usings), keeping the required `System.Runtime.CompilerServices`; converted explicit-type `new SessionRegistry()`/`new WorkerAlarmRpcDispatcher(...)`/`new FakeAlarmWorkerClient`/`new List<...>()`/`new GatewaySession(...)` to target-typed `new()`; and replaced the fully-qualified `System.StringComparison` with `StringComparison`. See the re-triage note for the two claims not actioned. Suite green.
### Tests-009
@@ -157,13 +159,13 @@
| Severity | Low |
| Category | Documentation & comments |
| Location | `src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs:36-37,99,365` |
| Status | Open |
| Status | Resolved |
**Description:** Several XML `<summary>` comments are copy-paste mismatches: the comment above `OpenSessionAsync_SetsInitialDefaultLease` describes correlation-ID generation; the comment above `GatewaySessionSubscribeBulkAsync_ForwardsOneBulkCommand…` describes lease refresh; the comment above `CloseExpiredLeasesAsync_DoesNotCloseActiveEventSubscriber` describes shutdown closing all sessions. Misleading test docs hinder triage.
**Recommendation:** Correct the `<summary>` text to match each test's actual behavior, or remove the redundant comments since the test names already describe the behavior.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: confirmed three copy-paste `<summary>` mismatches. The mislabelled comments were the summaries of the *following* tests left attached to the wrong method (the test below each then had no summary). Corrected all three: `OpenSessionAsync_SetsInitialDefaultLease` now describes setting the initial lease expiry; the comment above `InvokeAsync_WhenSessionReady_RefreshesLease` (the finding mis-cited the method name as `GatewaySessionSubscribeBulkAsync_…`) now describes lease refresh on invoke; and `CloseExpiredLeasesAsync_DoesNotCloseActiveEventSubscriber` now describes the expired-lease sweep leaving an active-event-subscriber session open. No behavior change.
### Tests-010
@@ -172,13 +174,13 @@
| Severity | Low |
| Category | Security |
| Location | `src/MxGateway.Tests/Gateway/Dashboard/DashboardAuthorizationHandlerTests.cs:26-36` |
| Status | Open |
| Status | Resolved |
**Description:** The anonymous-localhost bypass is tested only for the success case (`allowAnonymousLocalhost: true` + loopback succeeds) and the remote-unauthenticated denial. There is no test for the security-critical negatives: anonymous + loopback when `AllowAnonymousLocalhost` is `false` must be denied, and anonymous + non-loopback when the flag is `true` must still be denied (the bypass is scoped strictly to loopback). Those are the misconfiguration cases that would expose the dashboard.
**Recommendation:** Add tests: anonymous + loopback + `allowAnonymousLocalhost: false` → not succeeded; anonymous + non-loopback + `allowAnonymousLocalhost: true` → not succeeded.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: confirmed the coverage gap and confirmed `DashboardAuthorizationHandler` already gates the bypass correctly on `AllowAnonymousLocalhost && IsLoopbackRequest()` (no product bug). Added two `DashboardAuthorizationHandlerTests`: `HandleAsync_AnonymousLocalhostDisallowed_DoesNotSucceed` (anonymous + loopback + `allowAnonymousLocalhost: false` → not succeeded) and `HandleAsync_AnonymousLocalhostAllowedFromRemoteAddress_DoesNotSucceed` (anonymous + non-loopback + `allowAnonymousLocalhost: true` → not succeeded, proving the bypass stays scoped to loopback). Both pass.
### Tests-011
@@ -187,13 +189,13 @@
| Severity | Low |
| Category | Correctness & logic bugs |
| Location | `src/MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs:233-301` |
| Status | Open |
| Status | Resolved |
**Description:** `GatewayEndToEndFakeWorkerSmokeTests` correctly stores and awaits `launcher.WorkerTask`, but `SessionWorkerClientFactoryFakeWorkerTests` uses `_ = RunWorkerAsync(...)` with no stored task (lines 152, 184, 220). An unhandled exception in the scripted worker becomes an unobserved `TaskException` that can surface as a process-level failure in an unrelated later test rather than failing the owning test.
**Recommendation:** Store the worker task and either await it during disposal or attach a continuation that fails the test on fault, mirroring `GatewayEndToEndFakeWorkerSmokeTests`.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: confirmed all three scripted launchers in `SessionWorkerClientFactoryFakeWorkerTests` discarded the worker task. Added an `IWorkerTaskLauncher` interface (each launcher now stores its scripted task in a `WorkerTask` property and exposes `ObserveWorkerTaskAsync`); the test class now implements `IAsyncDisposable`, tracks every launcher it creates via a `Track` helper, and in `DisposeAsync` awaits each `WorkerTask` (within `TestTimeout`) so a scripted-worker fault fails the owning test instead of leaking as an unobserved `TaskScheduler.UnobservedTaskException`. `OperationCanceledException` and `IOException` — the expected outcomes of the worker client tearing the pipe down — are swallowed; anything else rethrows. `NeverReadyWorkerProcessLauncher` (which parks on an infinite `Task.Delay`) was given its own `CancellationTokenSource` so disposal can cancel and observe the parked task. Suite green.
### Tests-012
@@ -202,10 +204,10 @@
| Severity | Low |
| Category | Concurrency & thread safety |
| Location | `src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs:62`, `src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs:472` |
| Status | Open |
| Status | Resolved |
**Description:** Pipe names are uniquified per test with a GUID (good), but xUnit runs test classes in parallel by default and there is no `xunit.runner.json` or collection configuration. Tests that build a full `WebApplication` bind ephemeral ports (`--urls=http://127.0.0.1:0`, fine) but spin up DI containers and hosted services concurrently. Currently safe, but a future test binding a fixed port would silently collide.
**Recommendation:** Add an `xunit.runner.json` or a collection grouping the `WebApplication`-building tests, and keep the `:0` ephemeral-port convention explicit so future tests do not introduce a fixed-port collision.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: added `src/MxGateway.Tests/xunit.runner.json` making the parallelism policy explicit (`parallelizeTestCollections: true`, `maxParallelThreads: -1`, `parallelizeAssembly: false`, `longRunningTestSeconds: 30`) and wired it into `MxGateway.Tests.csproj` as `<None Update="xunit.runner.json" CopyToOutputDirectory="PreserveNewest" />` so the runner picks it up (confirmed present in `bin/Debug/net10.0/`). Added a comment at the only `WebApplication`-building call site (`GatewayApplicationTests.cs`, `--urls=http://127.0.0.1:0`) documenting that the ephemeral-port (`:0`) convention is mandatory because test collections run in parallel. No fixed-port binding exists today; this is a preventative guardrail as the finding recommends.
@@ -6,6 +6,7 @@ using MxGateway.Server.Dashboard;
using MxGateway.Server.Galaxy;
using MxGateway.Server.Grpc;
using MxGateway.Server.Security.Authorization;
using MxGateway.Tests.TestSupport;
namespace MxGateway.Tests.Galaxy;
@@ -302,51 +303,4 @@ public sealed class GalaxyFilterInputSafetyTests
public Task WaitForFirstLoadAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext
{
private readonly Metadata requestHeaders = [];
private readonly Metadata responseTrailers = [];
private readonly Dictionary<object, object> userState = [];
private Status status;
private WriteOptions? writeOptions;
protected override string MethodCore => "/galaxy_repository.v1.GalaxyRepository/DiscoverHierarchy";
protected override string HostCore => "localhost";
protected override string PeerCore => "ipv4:127.0.0.1:5000";
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
protected override Metadata RequestHeadersCore => requestHeaders;
protected override CancellationToken CancellationTokenCore => cancellationToken;
protected override Metadata ResponseTrailersCore => responseTrailers;
protected override Status StatusCore
{
get => status;
set => status = value;
}
protected override WriteOptions? WriteOptionsCore
{
get => writeOptions;
set => writeOptions = value;
}
protected override AuthContext AuthContextCore { get; } = new(
string.Empty,
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
protected override IDictionary<object, object> UserStateCore => userState;
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) => Task.CompletedTask;
protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions? options)
{
throw new NotSupportedException();
}
}
}
@@ -35,6 +35,36 @@ public sealed class DashboardAuthorizationHandlerTests
Assert.True(context.HasSucceeded);
}
/// <summary>
/// Verifies that the anonymous-localhost bypass is denied when <c>AllowAnonymousLocalhost</c>
/// is off, even on a loopback connection — the misconfiguration must not expose the dashboard.
/// </summary>
[Fact]
public async Task HandleAsync_AnonymousLocalhostDisallowed_DoesNotSucceed()
{
AuthorizationHandlerContext context = await AuthorizeAsync(
new ClaimsPrincipal(new ClaimsIdentity()),
IPAddress.Loopback,
allowAnonymousLocalhost: false);
Assert.False(context.HasSucceeded);
}
/// <summary>
/// Verifies that the anonymous-localhost bypass stays scoped to loopback: an anonymous
/// request from a non-loopback address is denied even when <c>AllowAnonymousLocalhost</c> is on.
/// </summary>
[Fact]
public async Task HandleAsync_AnonymousLocalhostAllowedFromRemoteAddress_DoesNotSucceed()
{
AuthorizationHandlerContext context = await AuthorizeAsync(
new ClaimsPrincipal(new ClaimsIdentity()),
IPAddress.Parse("10.0.0.5"),
allowAnonymousLocalhost: true);
Assert.False(context.HasSucceeded);
}
/// <summary>Verifies that authenticated users without admin scope fail authorization.</summary>
[Fact]
public async Task HandleAsync_AuthenticatedWithoutAdminScope_DoesNotSucceed()
@@ -147,6 +147,8 @@ public sealed class GatewayApplicationTests
string value,
string expectedFailure)
{
// Bind an ephemeral port (:0) — xUnit runs test collections in parallel, so any
// WebApplication-building test must avoid a fixed port to prevent a bind collision.
await using WebApplication app = GatewayApplication.Build(
[$"--{key}={value}", "--urls=http://127.0.0.1:0"]);
@@ -1,6 +1,5 @@
using System.Collections.Concurrent;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
@@ -13,6 +12,7 @@ using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
using MxGateway.Tests.Gateway.Workers.Fakes;
using MxGateway.Tests.TestSupport;
namespace MxGateway.Tests.Gateway;
@@ -405,159 +405,4 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
}
}
private sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
{
private readonly object _syncRoot = new();
private readonly TaskCompletionSource<T> _firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly List<T> _messages = [];
/// <summary>
/// Gets the recorded messages written to this stream.
/// </summary>
public IReadOnlyList<T> Messages
{
get
{
lock (_syncRoot)
{
return _messages.ToArray();
}
}
}
/// <summary>
/// Gets or sets options for writing messages to the stream.
/// </summary>
public WriteOptions? WriteOptions { get; set; }
/// <summary>
/// Writes a message to the stream asynchronously.
/// </summary>
/// <param name="message">The message to write.</param>
/// <returns>Completed task.</returns>
public Task WriteAsync(T message)
{
lock (_syncRoot)
{
_messages.Add(message);
}
_firstMessage.TrySetResult(message);
return Task.CompletedTask;
}
/// <summary>
/// Waits for the first message to be written within the specified timeout.
/// </summary>
/// <param name="timeout">Maximum time to wait for the first message.</param>
/// <returns>The first message written to this stream.</returns>
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout)
{
return await _firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
}
}
private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext
{
private readonly Metadata _requestHeaders = [];
private readonly Metadata _responseTrailers = [];
private readonly Dictionary<object, object> _userState = [];
private Status _status;
private WriteOptions? _writeOptions;
/// <inheritdoc />
protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test";
/// <inheritdoc />
protected override string HostCore => "localhost";
/// <inheritdoc />
protected override string PeerCore => "ipv4:127.0.0.1:5000";
/// <inheritdoc />
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
/// <inheritdoc />
protected override Metadata RequestHeadersCore => _requestHeaders;
/// <inheritdoc />
protected override CancellationToken CancellationTokenCore => cancellationToken;
/// <inheritdoc />
protected override Metadata ResponseTrailersCore => _responseTrailers;
/// <inheritdoc />
protected override Status StatusCore
{
get => _status;
set => _status = value;
}
/// <inheritdoc />
protected override WriteOptions? WriteOptionsCore
{
get => _writeOptions;
set => _writeOptions = value;
}
/// <inheritdoc />
protected override AuthContext AuthContextCore { get; } = new(
string.Empty,
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
/// <inheritdoc />
protected override IDictionary<object, object> UserStateCore => _userState;
/// <summary>
/// Writes response headers asynchronously.
/// </summary>
/// <param name="responseHeaders">Headers to write.</param>
/// <returns>Completed task.</returns>
/// <inheritdoc />
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
return Task.CompletedTask;
}
/// <summary>
/// Creates a context propagation token with the specified options.
/// </summary>
/// <param name="options">Propagation options.</param>
/// <returns>Propagation token.</returns>
/// <inheritdoc />
protected override ContextPropagationToken CreatePropagationTokenCore(
ContextPropagationOptions? options)
{
throw new NotSupportedException();
}
}
private sealed class AllowAllConstraintEnforcer : IConstraintEnforcer
{
public Task<ConstraintFailure?> CheckReadTagAsync(
ApiKeyIdentity? identity,
string tagAddress,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
public Task<ConstraintFailure?> CheckReadHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
public Task<ConstraintFailure?> CheckWriteHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
public Task RecordDenialAsync(
ApiKeyIdentity? identity,
string commandKind,
string target,
ConstraintFailure failure,
CancellationToken cancellationToken) => Task.CompletedTask;
}
}
@@ -5,6 +5,7 @@ using MxGateway.Server.Dashboard;
using MxGateway.Server.Galaxy;
using MxGateway.Server.Grpc;
using MxGateway.Server.Security.Authorization;
using MxGateway.Tests.TestSupport;
namespace MxGateway.Tests.Gateway.Grpc;
@@ -321,51 +322,4 @@ public sealed class GalaxyRepositoryGrpcServiceTests
public Task WaitForFirstLoadAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext
{
private readonly Metadata requestHeaders = [];
private readonly Metadata responseTrailers = [];
private readonly Dictionary<object, object> userState = [];
private Status status;
private WriteOptions? writeOptions;
protected override string MethodCore => "/galaxy_repository.v1.GalaxyRepository/DiscoverHierarchy";
protected override string HostCore => "localhost";
protected override string PeerCore => "ipv4:127.0.0.1:5000";
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
protected override Metadata RequestHeadersCore => requestHeaders;
protected override CancellationToken CancellationTokenCore => cancellationToken;
protected override Metadata ResponseTrailersCore => responseTrailers;
protected override Status StatusCore
{
get => status;
set => status = value;
}
protected override WriteOptions? WriteOptionsCore
{
get => writeOptions;
set => writeOptions = value;
}
protected override AuthContext AuthContextCore { get; } = new(
string.Empty,
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
protected override IDictionary<object, object> UserStateCore => userState;
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) => Task.CompletedTask;
protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions? options)
{
throw new NotSupportedException();
}
}
}
@@ -11,6 +11,7 @@ using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
using MxGateway.Tests.TestSupport;
namespace MxGateway.Tests.Gateway.Grpc;
@@ -132,7 +133,7 @@ public sealed class MxAccessGatewayServiceTests
SessionId = "session-missing",
AlarmFilterPrefix = "Tank01.",
},
new RecordingStreamWriter<ActiveAlarmSnapshot>(),
new RecordingServerStreamWriter<ActiveAlarmSnapshot>(),
new TestServerCallContext()));
Assert.Equal(StatusCode.NotFound, exception.StatusCode);
@@ -225,7 +226,7 @@ public sealed class MxAccessGatewayServiceTests
sessionManager.Events.Add(CreateWorkerEvent("session-1", workerSequence: 1));
sessionManager.Events.Add(CreateWorkerEvent("session-1", workerSequence: 2));
MxAccessGatewayService service = CreateService(sessionManager);
TestServerStreamWriter<MxEvent> writer = new();
RecordingServerStreamWriter<MxEvent> writer = new();
await service.StreamEvents(
new StreamEventsRequest
@@ -276,7 +277,7 @@ public sealed class MxAccessGatewayServiceTests
FakeSessionManager sessionManager = new();
sessionManager.Events.Add(CreateWorkerEvent("session-1", workerSequence: 2));
MxAccessGatewayService service = CreateService(sessionManager, metrics: metrics);
TestServerStreamWriter<MxEvent> writer = new();
RecordingServerStreamWriter<MxEvent> writer = new();
await service.StreamEvents(
new StreamEventsRequest { SessionId = "session-1" },
@@ -375,7 +376,7 @@ public sealed class MxAccessGatewayServiceTests
RpcException exception = await Assert.ThrowsAsync<RpcException>(
async () => await service.QueryActiveAlarms(
new QueryActiveAlarmsRequest(),
new RecordingStreamWriter<ActiveAlarmSnapshot>(),
new RecordingServerStreamWriter<ActiveAlarmSnapshot>(),
new TestServerCallContext()));
Assert.Equal(StatusCode.InvalidArgument, exception.StatusCode);
@@ -386,7 +387,7 @@ public sealed class MxAccessGatewayServiceTests
public async Task QueryActiveAlarms_WithValidRequest_StreamsZeroSnapshots()
{
MxAccessGatewayService service = CreateService(new FakeSessionManager());
RecordingStreamWriter<ActiveAlarmSnapshot> sink = new();
RecordingServerStreamWriter<ActiveAlarmSnapshot> sink = new();
await service.QueryActiveAlarms(
new QueryActiveAlarmsRequest
@@ -397,7 +398,7 @@ public sealed class MxAccessGatewayServiceTests
sink,
new TestServerCallContext());
Assert.Empty(sink.Items);
Assert.Empty(sink.Messages);
}
/// <summary>Verifies OpenSession advertises the alarm RPC capability strings.</summary>
@@ -664,35 +665,6 @@ public sealed class MxAccessGatewayServiceTests
}
}
private sealed class AllowAllConstraintEnforcer : IConstraintEnforcer
{
public Task<ConstraintFailure?> CheckReadTagAsync(
ApiKeyIdentity? identity,
string tagAddress,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
public Task<ConstraintFailure?> CheckReadHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
public Task<ConstraintFailure?> CheckWriteHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
public Task RecordDenialAsync(
ApiKeyIdentity? identity,
string commandKind,
string target,
ConstraintFailure failure,
CancellationToken cancellationToken) => Task.CompletedTask;
}
private sealed class FakeWorkerClient(int processId) : IWorkerClient
{
/// <inheritdoc />
@@ -750,97 +722,4 @@ public sealed class MxAccessGatewayServiceTests
}
}
private sealed class TestServerStreamWriter<T> : IServerStreamWriter<T>
{
/// <inheritdoc />
public List<T> Messages { get; } = [];
/// <inheritdoc />
public WriteOptions? WriteOptions { get; set; }
/// <inheritdoc />
public Task WriteAsync(T message)
{
Messages.Add(message);
return Task.CompletedTask;
}
}
private sealed class RecordingStreamWriter<T> : IServerStreamWriter<T>
{
public List<T> Items { get; } = new();
public WriteOptions? WriteOptions { get; set; }
public Task WriteAsync(T message)
{
Items.Add(message);
return Task.CompletedTask;
}
}
private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext
{
private readonly Metadata requestHeaders = [];
private readonly Metadata responseTrailers = [];
private readonly Dictionary<object, object> userState = [];
private Status status;
private WriteOptions? writeOptions;
/// <inheritdoc />
protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test";
/// <inheritdoc />
protected override string HostCore => "localhost";
/// <inheritdoc />
protected override string PeerCore => "ipv4:127.0.0.1:5000";
/// <inheritdoc />
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
/// <inheritdoc />
protected override Metadata RequestHeadersCore => requestHeaders;
/// <inheritdoc />
protected override CancellationToken CancellationTokenCore => cancellationToken;
/// <inheritdoc />
protected override Metadata ResponseTrailersCore => responseTrailers;
/// <inheritdoc />
protected override Status StatusCore
{
get => status;
set => status = value;
}
/// <inheritdoc />
protected override WriteOptions? WriteOptionsCore
{
get => writeOptions;
set => writeOptions = value;
}
/// <inheritdoc />
protected override AuthContext AuthContextCore { get; } = new(
string.Empty,
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
/// <inheritdoc />
protected override IDictionary<object, object> UserStateCore => userState;
/// <inheritdoc />
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
return Task.CompletedTask;
}
/// <inheritdoc />
protected override ContextPropagationToken CreatePropagationTokenCore(
ContextPropagationOptions? options)
{
throw new NotSupportedException();
}
}
}
@@ -1,5 +1,3 @@
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Sessions;
@@ -15,7 +13,7 @@ namespace MxGateway.Tests.Gateway.Sessions;
public sealed class NotWiredAlarmRpcDispatcherTests
{
[Fact]
public async Task AcknowledgeAsync_returns_ok_with_worker_pending_diagnostic()
public async Task AcknowledgeAsync_WhenNotWired_ReturnsOkWithWorkerPendingDiagnostic()
{
IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher();
@@ -33,11 +31,11 @@ public sealed class NotWiredAlarmRpcDispatcherTests
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
Assert.Equal("session-1", reply.SessionId);
Assert.Equal("corr-1", reply.CorrelationId);
Assert.Contains("worker", reply.DiagnosticMessage, System.StringComparison.OrdinalIgnoreCase);
Assert.Contains("worker", reply.DiagnosticMessage, StringComparison.OrdinalIgnoreCase);
}
[Fact]
public async Task QueryActiveAlarmsAsync_yields_no_snapshots()
public async Task QueryActiveAlarmsAsync_WhenNotWired_YieldsNoSnapshots()
{
IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher();
@@ -33,7 +33,7 @@ public sealed class SessionManagerTests
Assert.Equal(1, metrics.GetSnapshot().SessionsOpened);
}
/// <summary>Verifies that opening a session generates a correlation ID from the client name and session ID.</summary>
/// <summary>Verifies that opening a session sets the initial lease expiry from the configured default lease.</summary>
[Fact]
public async Task OpenSessionAsync_SetsInitialDefaultLease()
{
@@ -96,7 +96,7 @@ public sealed class SessionManagerTests
Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind);
}
/// <summary>Verifies that bulk subscribe forwards the command and returns subscription results.</summary>
/// <summary>Verifies that invoking a command on a ready session refreshes its lease expiry.</summary>
[Fact]
public async Task InvokeAsync_WhenSessionReady_RefreshesLease()
{
@@ -362,7 +362,7 @@ public sealed class SessionManagerTests
Assert.Equal(0, activeClient.ShutdownCount);
}
/// <summary>Verifies that shutdown closes all registered sessions.</summary>
/// <summary>Verifies that an expired-lease sweep leaves a session with an active event subscriber open.</summary>
[Fact]
public async Task CloseExpiredLeasesAsync_DoesNotCloseActiveEventSubscriber()
{
@@ -10,15 +10,29 @@ using MxGateway.Tests.Gateway.Workers.Fakes;
namespace MxGateway.Tests.Gateway.Sessions;
public sealed class SessionWorkerClientFactoryFakeWorkerTests
public sealed class SessionWorkerClientFactoryFakeWorkerTests : IAsyncDisposable
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
private readonly List<IWorkerTaskLauncher> _launchers = [];
/// <summary>
/// Awaits every scripted worker task so an unhandled exception fails the owning test
/// instead of surfacing later as an unobserved <see cref="TaskScheduler.UnobservedTaskException"/>.
/// </summary>
public async ValueTask DisposeAsync()
{
foreach (IWorkerTaskLauncher launcher in _launchers)
{
await launcher.ObserveWorkerTaskAsync(TestTimeout);
}
}
/// <summary>Verifies that the factory creates a ready worker client with a scripted fake worker process.</summary>
[Fact]
public async Task CreateAsync_WithScriptedFakeWorker_ReturnsReadyClient()
{
ScriptedFakeWorkerProcessLauncher launcher = new();
ScriptedFakeWorkerProcessLauncher launcher = Track(new ScriptedFakeWorkerProcessLauncher());
using GatewayMetrics metrics = new();
SessionWorkerClientFactory factory = new(
launcher,
@@ -51,7 +65,7 @@ public sealed class SessionWorkerClientFactoryFakeWorkerTests
[Fact]
public async Task CreateAsync_WhenFakeWorkerStartupFails_ThrowsWorkerClientException()
{
FailingStartupWorkerProcessLauncher launcher = new();
FailingStartupWorkerProcessLauncher launcher = Track(new FailingStartupWorkerProcessLauncher());
using GatewayMetrics metrics = new();
SessionWorkerClientFactory factory = new(
launcher,
@@ -71,7 +85,7 @@ public sealed class SessionWorkerClientFactoryFakeWorkerTests
[Fact]
public async Task CreateAsync_WhenFakeWorkerNeverSendsReady_TimesOutAndKillsWorker()
{
NeverReadyWorkerProcessLauncher launcher = new();
NeverReadyWorkerProcessLauncher launcher = Track(new NeverReadyWorkerProcessLauncher());
using GatewayMetrics metrics = new();
SessionWorkerClientFactory factory = new(
launcher,
@@ -134,8 +148,50 @@ public sealed class SessionWorkerClientFactoryFakeWorkerTests
};
}
private T Track<T>(T launcher)
where T : IWorkerTaskLauncher
{
_launchers.Add(launcher);
return launcher;
}
/// <summary>
/// A fake worker launcher that runs a scripted worker on a background task and exposes
/// that task so the owning test observes it rather than leaking an unobserved fault.
/// </summary>
private interface IWorkerTaskLauncher : IWorkerProcessLauncher
{
/// <summary>
/// Awaits the scripted worker task within the timeout, swallowing only the pipe
/// teardown faults expected when the worker client kills or disposes the worker.
/// </summary>
/// <param name="timeout">Maximum time to wait for the worker task.</param>
Task ObserveWorkerTaskAsync(TimeSpan timeout);
}
/// <summary>
/// Awaits a scripted worker task, treating cancellation and pipe-disconnect I/O faults as
/// the expected outcome of the worker client tearing the worker down, and rethrowing anything else.
/// </summary>
private static async Task ObserveWorkerTaskAsync(Task workerTask, TimeSpan timeout)
{
try
{
await workerTask.WaitAsync(timeout).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Expected: the worker client cancelled the scripted worker during teardown.
}
catch (IOException)
{
// Expected: the gateway pipe was closed when the worker client disposed.
}
}
/// <summary>Fake worker launcher that connects a scripted fake worker harness.</summary>
private sealed class ScriptedFakeWorkerProcessLauncher : IWorkerProcessLauncher
private sealed class ScriptedFakeWorkerProcessLauncher : IWorkerTaskLauncher
{
/// <summary>The fake process ID used by the scripted launcher.</summary>
public const int ProcessId = 2468;
@@ -144,16 +200,23 @@ public sealed class SessionWorkerClientFactoryFakeWorkerTests
/// <summary>Gets the connected fake worker harness.</summary>
public FakeWorkerHarness? Harness { get; private set; }
/// <summary>Gets the scripted worker task.</summary>
public Task WorkerTask { get; private set; } = Task.CompletedTask;
/// <inheritdoc />
public Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default)
{
_ = RunWorkerAsync(request, cancellationToken);
WorkerTask = RunWorkerAsync(request, cancellationToken);
return Task.FromResult(CreateHandle(_process));
}
/// <inheritdoc />
public Task ObserveWorkerTaskAsync(TimeSpan timeout) =>
SessionWorkerClientFactoryFakeWorkerTests.ObserveWorkerTaskAsync(WorkerTask, timeout);
private async Task RunWorkerAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
@@ -169,21 +232,28 @@ public sealed class SessionWorkerClientFactoryFakeWorkerTests
}
/// <summary>Fake worker launcher that fails during startup with protocol version mismatch.</summary>
private sealed class FailingStartupWorkerProcessLauncher : IWorkerProcessLauncher
private sealed class FailingStartupWorkerProcessLauncher : IWorkerTaskLauncher
{
/// <summary>Gets the fake worker process.</summary>
public FakeWorkerProcess Process { get; } = new(processId: 3579);
/// <summary>Gets the scripted worker task.</summary>
public Task WorkerTask { get; private set; } = Task.CompletedTask;
/// <inheritdoc />
public Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default)
{
_ = RunWorkerAsync(request, cancellationToken);
WorkerTask = RunWorkerAsync(request, cancellationToken);
return Task.FromResult(CreateHandle(Process));
}
/// <inheritdoc />
public Task ObserveWorkerTaskAsync(TimeSpan timeout) =>
SessionWorkerClientFactoryFakeWorkerTests.ObserveWorkerTaskAsync(WorkerTask, timeout);
private async Task RunWorkerAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
@@ -203,37 +273,52 @@ public sealed class SessionWorkerClientFactoryFakeWorkerTests
}
/// <summary>Fake worker launcher that never completes startup, simulating a hung worker.</summary>
private sealed class NeverReadyWorkerProcessLauncher : IWorkerProcessLauncher
private sealed class NeverReadyWorkerProcessLauncher : IWorkerTaskLauncher
{
private readonly CancellationTokenSource _stop = new();
/// <summary>Gets the fake worker process.</summary>
public FakeWorkerProcess Process { get; } = new(processId: 4680);
/// <summary>Gets the scripted worker task.</summary>
public Task WorkerTask { get; private set; } = Task.CompletedTask;
/// <inheritdoc />
public Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default)
{
_ = RunWorkerAsync(request, cancellationToken);
WorkerTask = RunWorkerAsync(request);
return Task.FromResult(CreateHandle(Process));
}
private async Task RunWorkerAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
/// <inheritdoc />
public async Task ObserveWorkerTaskAsync(TimeSpan timeout)
{
// The scripted worker parks on an infinite delay; cancel it so disposal observes
// the task instead of leaking it as an unobserved fault.
await _stop.CancelAsync().ConfigureAwait(false);
await SessionWorkerClientFactoryFakeWorkerTests
.ObserveWorkerTaskAsync(WorkerTask, timeout)
.ConfigureAwait(false);
_stop.Dispose();
}
private async Task RunWorkerAsync(WorkerProcessLaunchRequest request)
{
await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync(
request.SessionId,
request.Nonce,
request.PipeName,
request.ProtocolVersion,
cancellationToken: cancellationToken).ConfigureAwait(false);
_ = await harness.ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false);
cancellationToken: _stop.Token).ConfigureAwait(false);
_ = await harness.ReadGatewayEnvelopeAsync(_stop.Token).ConfigureAwait(false);
await harness.SendWorkerHelloAsync(
workerProcessId: Process.Id,
workerProtocolVersion: request.ProtocolVersion,
cancellationToken: cancellationToken).ConfigureAwait(false);
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
cancellationToken: _stop.Token).ConfigureAwait(false);
await Task.Delay(Timeout.InfiniteTimeSpan, _stop.Token).ConfigureAwait(false);
}
}
@@ -1,9 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
@@ -19,10 +14,10 @@ namespace MxGateway.Tests.Gateway.Sessions;
public sealed class WorkerAlarmRpcDispatcherTests
{
[Fact]
public async Task AcknowledgeAsync_returns_session_not_found_when_session_missing()
public async Task AcknowledgeAsync_WhenSessionMissing_ReturnsSessionNotFound()
{
SessionRegistry registry = new SessionRegistry();
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
SessionRegistry registry = new();
WorkerAlarmRpcDispatcher dispatcher = new(registry);
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
new AcknowledgeAlarmRequest
@@ -37,11 +32,11 @@ public sealed class WorkerAlarmRpcDispatcherTests
}
[Fact]
public async Task AcknowledgeAsync_forwards_guid_and_returns_native_status()
public async Task AcknowledgeAsync_WithGuidReference_ForwardsGuidAndReturnsNativeStatus()
{
SessionRegistry registry = new SessionRegistry();
SessionRegistry registry = new();
Guid alarmGuid = Guid.NewGuid();
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
FakeAlarmWorkerClient worker = new()
{
ReplyFactory = command =>
{
@@ -63,7 +58,7 @@ public sealed class WorkerAlarmRpcDispatcherTests
session.MarkReady();
registry.TryAdd(session);
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
WorkerAlarmRpcDispatcher dispatcher = new(registry);
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
new AcknowledgeAlarmRequest
@@ -84,10 +79,10 @@ public sealed class WorkerAlarmRpcDispatcherTests
}
[Fact]
public async Task AcknowledgeAsync_propagates_worker_diagnostic_on_failure()
public async Task AcknowledgeAsync_WhenWorkerFails_PropagatesWorkerDiagnostic()
{
SessionRegistry registry = new SessionRegistry();
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
SessionRegistry registry = new();
FakeAlarmWorkerClient worker = new()
{
ReplyFactory = _ => new MxCommandReply
{
@@ -106,7 +101,7 @@ public sealed class WorkerAlarmRpcDispatcherTests
session.MarkReady();
registry.TryAdd(session);
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
WorkerAlarmRpcDispatcher dispatcher = new(registry);
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
new AcknowledgeAlarmRequest
@@ -125,7 +120,7 @@ public sealed class WorkerAlarmRpcDispatcherTests
[InlineData("Galaxy!TestArea.TestMachine_001.TestAlarm001", "Galaxy", "TestArea", "TestMachine_001.TestAlarm001")]
[InlineData("Galaxy!Area.Tag", "Galaxy", "Area", "Tag")]
[InlineData("Provider!Group.Tag.With.Dots", "Provider", "Group", "Tag.With.Dots")]
public void TryParseAlarmReference_decomposes_provider_group_tag(
public void TryParseAlarmReference_WithProviderGroupTag_DecomposesParts(
string reference, string expectedProvider, string expectedGroup, string expectedName)
{
Assert.True(WorkerAlarmRpcDispatcher.TryParseAlarmReference(
@@ -145,18 +140,18 @@ public sealed class WorkerAlarmRpcDispatcherTests
[InlineData("Galaxy!Group")] // missing dot
[InlineData("Galaxy!.Tag")] // empty group
[InlineData("Galaxy!Group.")] // empty tag
public void TryParseAlarmReference_rejects_malformed_references(string? reference)
public void TryParseAlarmReference_WithMalformedReference_ReturnsFalse(string? reference)
{
Assert.False(WorkerAlarmRpcDispatcher.TryParseAlarmReference(
reference, out _, out _, out _));
}
[Fact]
public async Task AcknowledgeAsync_routes_provider_group_tag_via_AckByName()
public async Task AcknowledgeAsync_WithProviderGroupTagReference_RoutesViaAckByName()
{
SessionRegistry registry = new SessionRegistry();
SessionRegistry registry = new();
AcknowledgeAlarmByNameCommand? observed = null;
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
FakeAlarmWorkerClient worker = new()
{
ReplyFactory = command =>
{
@@ -176,7 +171,7 @@ public sealed class WorkerAlarmRpcDispatcherTests
session.MarkReady();
registry.TryAdd(session);
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
WorkerAlarmRpcDispatcher dispatcher = new(registry);
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
new AcknowledgeAlarmRequest
@@ -199,16 +194,16 @@ public sealed class WorkerAlarmRpcDispatcherTests
}
[Fact]
public async Task AcknowledgeAsync_returns_invalid_request_for_unparseable_reference()
public async Task AcknowledgeAsync_WithUnparseableReference_ReturnsInvalidRequest()
{
SessionRegistry registry = new SessionRegistry();
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient();
SessionRegistry registry = new();
FakeAlarmWorkerClient worker = new();
GatewaySession session = NewSession("s1");
session.AttachWorkerClient(worker);
session.MarkReady();
registry.TryAdd(session);
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
WorkerAlarmRpcDispatcher dispatcher = new(registry);
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
new AcknowledgeAlarmRequest
@@ -223,10 +218,10 @@ public sealed class WorkerAlarmRpcDispatcherTests
}
[Fact]
public async Task QueryActiveAlarmsAsync_yields_each_snapshot_from_payload()
public async Task QueryActiveAlarmsAsync_WithPayloadSnapshots_YieldsEachSnapshot()
{
SessionRegistry registry = new SessionRegistry();
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
SessionRegistry registry = new();
FakeAlarmWorkerClient worker = new()
{
ReplyFactory = command =>
{
@@ -257,9 +252,9 @@ public sealed class WorkerAlarmRpcDispatcherTests
session.MarkReady();
registry.TryAdd(session);
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
WorkerAlarmRpcDispatcher dispatcher = new(registry);
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
List<ActiveAlarmSnapshot> collected = new();
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
new QueryActiveAlarmsRequest { SessionId = "s1" },
CancellationToken.None))
@@ -273,12 +268,12 @@ public sealed class WorkerAlarmRpcDispatcherTests
}
[Fact]
public async Task QueryActiveAlarmsAsync_yields_empty_when_session_missing()
public async Task QueryActiveAlarmsAsync_WhenSessionMissing_YieldsEmpty()
{
SessionRegistry registry = new SessionRegistry();
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
SessionRegistry registry = new();
WorkerAlarmRpcDispatcher dispatcher = new(registry);
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
List<ActiveAlarmSnapshot> collected = new();
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
new QueryActiveAlarmsRequest { SessionId = "missing" },
CancellationToken.None))
@@ -290,10 +285,10 @@ public sealed class WorkerAlarmRpcDispatcherTests
}
[Fact]
public async Task QueryActiveAlarmsAsync_yields_empty_on_worker_failure()
public async Task QueryActiveAlarmsAsync_WhenWorkerFails_YieldsEmpty()
{
SessionRegistry registry = new SessionRegistry();
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
SessionRegistry registry = new();
FakeAlarmWorkerClient worker = new()
{
ReplyFactory = _ => new MxCommandReply
{
@@ -310,9 +305,9 @@ public sealed class WorkerAlarmRpcDispatcherTests
session.MarkReady();
registry.TryAdd(session);
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
WorkerAlarmRpcDispatcher dispatcher = new(registry);
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
List<ActiveAlarmSnapshot> collected = new();
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
new QueryActiveAlarmsRequest { SessionId = "s1" },
CancellationToken.None))
@@ -325,7 +320,7 @@ public sealed class WorkerAlarmRpcDispatcherTests
private static GatewaySession NewSession(string sessionId)
{
return new GatewaySession(
return new(
sessionId,
"mxaccess",
$"mxaccess-gateway-1-{sessionId}",
@@ -16,6 +16,13 @@
<Using Include="Xunit" />
</ItemGroup>
<ItemGroup>
<!-- Makes the xUnit parallelism policy explicit (Tests-012): test collections run in
parallel, so WebApplication-building tests must keep binding ephemeral ports
(http://127.0.0.1:0) to avoid a future fixed-port collision. -->
<None Update="xunit.runner.json" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
<ProjectReference Include="..\MxGateway.Server\MxGateway.Server.csproj" />
@@ -10,6 +10,7 @@ using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Tests.TestSupport;
namespace MxGateway.Tests.Security.Authorization;
@@ -107,7 +108,7 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
RpcException exception = await Assert.ThrowsAsync<RpcException>(
() => interceptor.ServerStreamingServerHandler(
new StreamEventsRequest(),
new TestServerStreamWriter<MxEvent>(),
new RecordingServerStreamWriter<MxEvent>(),
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
(_, _, _) => Task.CompletedTask));
@@ -123,7 +124,7 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.EventsRead)),
identityAccessor);
TestServerStreamWriter<MxEvent> streamWriter = new();
RecordingServerStreamWriter<MxEvent> streamWriter = new();
await interceptor.ServerStreamingServerHandler(
new StreamEventsRequest(),
@@ -396,40 +397,6 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
}
}
/// <summary>Constraint enforcer that permits every operation for composition tests.</summary>
private sealed class AllowAllConstraintEnforcer : IConstraintEnforcer
{
/// <inheritdoc />
public Task<ConstraintFailure?> CheckReadTagAsync(
ApiKeyIdentity? identity,
string tagAddress,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
/// <inheritdoc />
public Task<ConstraintFailure?> CheckReadHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
/// <inheritdoc />
public Task<ConstraintFailure?> CheckWriteHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
/// <inheritdoc />
public Task RecordDenialAsync(
ApiKeyIdentity? identity,
string commandKind,
string target,
ConstraintFailure failure,
CancellationToken cancellationToken) => Task.CompletedTask;
}
private sealed class FakeApiKeyVerifier(ApiKeyVerificationResult result) : IApiKeyVerifier
{
/// <summary>Gets whether the verifier was called.</summary>
@@ -453,88 +420,4 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
}
}
private sealed class TestServerStreamWriter<T> : IServerStreamWriter<T>
{
/// <summary>Gets messages written to the stream.</summary>
public List<T> Messages { get; } = [];
/// <summary>Gets or sets write options for the stream.</summary>
public WriteOptions? WriteOptions { get; set; }
/// <summary>Writes a message to the stream.</summary>
/// <param name="message">The message to write.</param>
/// <returns>Task representing the write operation.</returns>
public Task WriteAsync(T message)
{
Messages.Add(message);
return Task.CompletedTask;
}
}
private sealed class TestServerCallContext(
Metadata requestHeaders,
CancellationToken cancellationToken = default) : ServerCallContext
{
private readonly Metadata responseTrailers = [];
private readonly Dictionary<object, object> userState = [];
private Status status;
private WriteOptions? writeOptions;
/// <inheritdoc />
protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test";
/// <inheritdoc />
protected override string HostCore => "localhost";
/// <inheritdoc />
protected override string PeerCore => "ipv4:127.0.0.1:5000";
/// <inheritdoc />
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
/// <inheritdoc />
protected override Metadata RequestHeadersCore => requestHeaders;
/// <inheritdoc />
protected override CancellationToken CancellationTokenCore => cancellationToken;
/// <inheritdoc />
protected override Metadata ResponseTrailersCore => responseTrailers;
/// <inheritdoc />
protected override Status StatusCore
{
get => status;
set => status = value;
}
/// <inheritdoc />
protected override WriteOptions? WriteOptionsCore
{
get => writeOptions;
set => writeOptions = value;
}
/// <inheritdoc />
protected override AuthContext AuthContextCore { get; } = new(
string.Empty,
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
/// <inheritdoc />
protected override IDictionary<object, object> UserStateCore => userState;
/// <inheritdoc />
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
return Task.CompletedTask;
}
/// <inheritdoc />
protected override ContextPropagationToken CreatePropagationTokenCore(
ContextPropagationOptions? options)
{
throw new NotSupportedException();
}
}
}
@@ -0,0 +1,42 @@
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
namespace MxGateway.Tests.TestSupport;
/// <summary>
/// <see cref="IConstraintEnforcer"/> that permits every operation, for tests that
/// exercise gRPC service or interceptor behaviour without constraint policy.
/// </summary>
public sealed class AllowAllConstraintEnforcer : IConstraintEnforcer
{
/// <inheritdoc />
public Task<ConstraintFailure?> CheckReadTagAsync(
ApiKeyIdentity? identity,
string tagAddress,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
/// <inheritdoc />
public Task<ConstraintFailure?> CheckReadHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
/// <inheritdoc />
public Task<ConstraintFailure?> CheckWriteHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken) => Task.FromResult<ConstraintFailure?>(null);
/// <inheritdoc />
public Task RecordDenialAsync(
ApiKeyIdentity? identity,
string commandKind,
string target,
ConstraintFailure failure,
CancellationToken cancellationToken) => Task.CompletedTask;
}
@@ -0,0 +1,50 @@
using Grpc.Core;
namespace MxGateway.Tests.TestSupport;
/// <summary>
/// Thread-safe <see cref="IServerStreamWriter{T}"/> that records every written message
/// and lets a test await the first message with a timeout.
/// </summary>
/// <typeparam name="T">The streamed message type.</typeparam>
public sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
{
private readonly object _syncRoot = new();
private readonly TaskCompletionSource<T> _firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly List<T> _messages = [];
/// <summary>Gets the messages written to this stream, in order.</summary>
public IReadOnlyList<T> Messages
{
get
{
lock (_syncRoot)
{
return _messages.ToArray();
}
}
}
/// <summary>Gets or sets options for writing messages to the stream.</summary>
public WriteOptions? WriteOptions { get; set; }
/// <summary>Records the supplied message.</summary>
/// <param name="message">The message to record.</param>
/// <returns>A completed task.</returns>
public Task WriteAsync(T message)
{
lock (_syncRoot)
{
_messages.Add(message);
}
_firstMessage.TrySetResult(message);
return Task.CompletedTask;
}
/// <summary>Waits for the first message to be written within the specified timeout.</summary>
/// <param name="timeout">Maximum time to wait for the first message.</param>
/// <returns>The first message written to this stream.</returns>
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout) =>
await _firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
}
@@ -0,0 +1,76 @@
using Grpc.Core;
namespace MxGateway.Tests.TestSupport;
/// <summary>
/// Minimal in-memory <see cref="ServerCallContext"/> for exercising gRPC service
/// implementations directly in unit tests, without a real gRPC transport.
/// </summary>
public sealed class TestServerCallContext : ServerCallContext
{
private readonly Metadata _requestHeaders;
private readonly Metadata _responseTrailers = [];
private readonly Dictionary<object, object> _userState = [];
private readonly CancellationToken _cancellationToken;
private Status _status;
private WriteOptions? _writeOptions;
/// <summary>Initializes the context with the supplied request headers and cancellation token.</summary>
/// <param name="requestHeaders">Request headers visible to the service; defaults to empty.</param>
/// <param name="cancellationToken">Cancellation token surfaced to the service.</param>
public TestServerCallContext(Metadata? requestHeaders = null, CancellationToken cancellationToken = default)
{
_requestHeaders = requestHeaders ?? [];
_cancellationToken = cancellationToken;
}
/// <inheritdoc />
protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test";
/// <inheritdoc />
protected override string HostCore => "localhost";
/// <inheritdoc />
protected override string PeerCore => "ipv4:127.0.0.1:5000";
/// <inheritdoc />
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
/// <inheritdoc />
protected override Metadata RequestHeadersCore => _requestHeaders;
/// <inheritdoc />
protected override CancellationToken CancellationTokenCore => _cancellationToken;
/// <inheritdoc />
protected override Metadata ResponseTrailersCore => _responseTrailers;
/// <inheritdoc />
protected override Status StatusCore
{
get => _status;
set => _status = value;
}
/// <inheritdoc />
protected override WriteOptions? WriteOptionsCore
{
get => _writeOptions;
set => _writeOptions = value;
}
/// <inheritdoc />
protected override AuthContext AuthContextCore { get; } = new(
string.Empty,
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
/// <inheritdoc />
protected override IDictionary<object, object> UserStateCore => _userState;
/// <inheritdoc />
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) => Task.CompletedTask;
/// <inheritdoc />
protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions? options) =>
throw new NotSupportedException();
}
+8
View File
@@ -0,0 +1,8 @@
{
"$schema": "https://xunit.net/schema/current/xunit.runner.schema.json",
"appDomain": "denied",
"parallelizeAssembly": false,
"parallelizeTestCollections": true,
"maxParallelThreads": -1,
"longRunningTestSeconds": 30
}