Add live MXAccess worker smoke test #91
@@ -35,6 +35,45 @@ inside the test.
|
||||
`OpenSession`, `Register`, `AddItem`, `Advise`, one streamed `OnDataChange`
|
||||
event, and `CloseSession` without loading MXAccess COM.
|
||||
|
||||
## Live MXAccess Smoke
|
||||
|
||||
`WorkerLiveMxAccessSmokeTests` in `src/MxGateway.IntegrationTests/` composes the
|
||||
real gRPC service, `SessionManager`, `SessionWorkerClientFactory`,
|
||||
`WorkerClient`, `WorkerProcessLauncher`, and `MxGateway.Worker.exe`. It is
|
||||
skipped unless `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1` is set because it creates
|
||||
the installed MXAccess COM object and depends on live provider state.
|
||||
|
||||
The live smoke opens a gateway session, launches the x86 worker, runs
|
||||
`Register`, `AddItem`, and `Advise`, waits a bounded time for one
|
||||
`OnDataChange`, and closes the session in a `finally` block so the worker gets a
|
||||
graceful shutdown request even when a command or event assertion fails.
|
||||
|
||||
Build the worker before running the smoke:
|
||||
|
||||
```bash
|
||||
dotnet build src/MxGateway.Worker/MxGateway.Worker.csproj -p:Platform=x86
|
||||
```
|
||||
|
||||
Run the smoke explicitly:
|
||||
|
||||
```bash
|
||||
$env:MXGATEWAY_RUN_LIVE_MXACCESS_TESTS = "1"
|
||||
dotnet test src/MxGateway.IntegrationTests/MxGateway.IntegrationTests.csproj --filter FullyQualifiedName~WorkerLiveMxAccessSmokeTests
|
||||
```
|
||||
|
||||
Optional live smoke variables:
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `MXGATEWAY_LIVE_MXACCESS_WORKER_EXE` | First existing `MxGateway.Worker.exe` under `src/MxGateway.Worker/bin/...` | Worker executable path. Set this when running against a packaged worker or a non-default build output. |
|
||||
| `MXGATEWAY_LIVE_MXACCESS_ITEM` | `TestChildObject.TestInt` | MXAccess item reference used by `AddItem`. |
|
||||
| `MXGATEWAY_LIVE_MXACCESS_CLIENT_NAME` | `MxGateway.IntegrationTests` | Client name passed to `Register`. |
|
||||
| `MXGATEWAY_LIVE_MXACCESS_EVENT_TIMEOUT_SECONDS` | `15` | Maximum wait for the first `OnDataChange`. |
|
||||
|
||||
The test output includes session id, worker process id, command status,
|
||||
HRESULT/status diagnostics, event sequence and handles, close status, and worker
|
||||
stdout/stderr lines emitted during the run.
|
||||
|
||||
## Focused Commands
|
||||
|
||||
Run the fake worker tests after changing gateway worker IPC, session startup, or
|
||||
|
||||
@@ -807,6 +807,14 @@ tests. `AddItem` uses `TestChildObject.TestInt` by default and accepts an
|
||||
override through `MXGATEWAY_LIVE_MXACCESS_ITEM`; `AddItem2` uses the captured
|
||||
parity fixture shape `AddItem2("TestInt", "TestChildObject")`.
|
||||
|
||||
`WorkerLiveMxAccessSmokeTests` in `src/MxGateway.IntegrationTests/` uses the
|
||||
same opt-in variable for the gateway-to-worker live smoke. It launches the x86
|
||||
worker through `WorkerProcessLauncher`, opens a gateway session, runs
|
||||
`Register`, `AddItem`, and `Advise`, waits for one `OnDataChange`, and closes
|
||||
the session. The smoke accepts `MXGATEWAY_LIVE_MXACCESS_WORKER_EXE` for a
|
||||
non-default worker executable path and
|
||||
`MXGATEWAY_LIVE_MXACCESS_EVENT_TIMEOUT_SECONDS` for the bounded event wait.
|
||||
|
||||
## Initial Implementation Slice
|
||||
|
||||
The first worker slice should implement:
|
||||
|
||||
@@ -3,10 +3,92 @@ namespace MxGateway.IntegrationTests;
|
||||
public static class IntegrationTestEnvironment
|
||||
{
|
||||
public const string LiveMxAccessVariableName = "MXGATEWAY_RUN_LIVE_MXACCESS_TESTS";
|
||||
public const string LiveMxAccessWorkerExecutableVariableName = "MXGATEWAY_LIVE_MXACCESS_WORKER_EXE";
|
||||
public const string LiveMxAccessItemVariableName = "MXGATEWAY_LIVE_MXACCESS_ITEM";
|
||||
public const string LiveMxAccessClientNameVariableName = "MXGATEWAY_LIVE_MXACCESS_CLIENT_NAME";
|
||||
public const string LiveMxAccessEventTimeoutSecondsVariableName = "MXGATEWAY_LIVE_MXACCESS_EVENT_TIMEOUT_SECONDS";
|
||||
|
||||
public static bool LiveMxAccessTestsEnabled =>
|
||||
string.Equals(
|
||||
Environment.GetEnvironmentVariable(LiveMxAccessVariableName),
|
||||
"1",
|
||||
StringComparison.Ordinal);
|
||||
|
||||
public static string LiveMxAccessItem =>
|
||||
GetOptionalEnvironmentVariable(
|
||||
LiveMxAccessItemVariableName,
|
||||
"TestChildObject.TestInt");
|
||||
|
||||
public static string LiveMxAccessClientName =>
|
||||
GetOptionalEnvironmentVariable(
|
||||
LiveMxAccessClientNameVariableName,
|
||||
"MxGateway.IntegrationTests");
|
||||
|
||||
public static TimeSpan LiveMxAccessEventTimeout =>
|
||||
TimeSpan.FromSeconds(GetPositiveIntegerEnvironmentVariable(
|
||||
LiveMxAccessEventTimeoutSecondsVariableName,
|
||||
defaultValue: 15));
|
||||
|
||||
public static string ResolveLiveMxAccessWorkerExecutablePath()
|
||||
{
|
||||
string? configuredPath = Environment.GetEnvironmentVariable(LiveMxAccessWorkerExecutableVariableName);
|
||||
if (!string.IsNullOrWhiteSpace(configuredPath))
|
||||
{
|
||||
return Path.GetFullPath(configuredPath);
|
||||
}
|
||||
|
||||
string repositoryRoot = ResolveRepositoryRoot(AppContext.BaseDirectory);
|
||||
string[] candidatePaths =
|
||||
[
|
||||
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Debug", "net48", "MxGateway.Worker.exe"),
|
||||
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "Debug", "net48", "MxGateway.Worker.exe"),
|
||||
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Release", "net48", "MxGateway.Worker.exe"),
|
||||
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "Release", "net48", "MxGateway.Worker.exe"),
|
||||
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Release", "MxGateway.Worker.exe"),
|
||||
];
|
||||
|
||||
return candidatePaths.FirstOrDefault(File.Exists)
|
||||
?? candidatePaths[0];
|
||||
}
|
||||
|
||||
private static string GetOptionalEnvironmentVariable(
|
||||
string name,
|
||||
string defaultValue)
|
||||
{
|
||||
string? value = Environment.GetEnvironmentVariable(name);
|
||||
return string.IsNullOrWhiteSpace(value)
|
||||
? defaultValue
|
||||
: value;
|
||||
}
|
||||
|
||||
private static int GetPositiveIntegerEnvironmentVariable(
|
||||
string name,
|
||||
int defaultValue)
|
||||
{
|
||||
string? value = Environment.GetEnvironmentVariable(name);
|
||||
if (int.TryParse(value, out int parsed) && parsed > 0)
|
||||
{
|
||||
return parsed;
|
||||
}
|
||||
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
internal static string ResolveRepositoryRoot(string startDirectory)
|
||||
{
|
||||
DirectoryInfo? directory = new(startDirectory);
|
||||
while (directory is not null)
|
||||
{
|
||||
if ((Directory.Exists(Path.Combine(directory.FullName, ".git"))
|
||||
|| File.Exists(Path.Combine(directory.FullName, ".git")))
|
||||
&& Directory.Exists(Path.Combine(directory.FullName, "src")))
|
||||
{
|
||||
return directory.FullName;
|
||||
}
|
||||
|
||||
directory = directory.Parent;
|
||||
}
|
||||
|
||||
return Directory.GetCurrentDirectory();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,4 +9,37 @@ public sealed class IntegrationTestEnvironmentTests
|
||||
"MXGATEWAY_RUN_LIVE_MXACCESS_TESTS",
|
||||
IntegrationTestEnvironment.LiveMxAccessVariableName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LiveMxAccessWorkerExecutable_UsesDocumentedEnvironmentVariable()
|
||||
{
|
||||
Assert.Equal(
|
||||
"MXGATEWAY_LIVE_MXACCESS_WORKER_EXE",
|
||||
IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ResolveRepositoryRoot_AcceptsGitWorktreeFile()
|
||||
{
|
||||
string temporaryRoot = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
|
||||
string nestedDirectory = Path.Combine(temporaryRoot, "tests", "bin");
|
||||
|
||||
try
|
||||
{
|
||||
Directory.CreateDirectory(nestedDirectory);
|
||||
Directory.CreateDirectory(Path.Combine(temporaryRoot, "src"));
|
||||
File.WriteAllText(Path.Combine(temporaryRoot, ".git"), "gitdir: ../.git/worktrees/test");
|
||||
|
||||
string repositoryRoot = IntegrationTestEnvironment.ResolveRepositoryRoot(nestedDirectory);
|
||||
|
||||
Assert.Equal(temporaryRoot, repositoryRoot);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (Directory.Exists(temporaryRoot))
|
||||
{
|
||||
Directory.Delete(temporaryRoot, recursive: true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
namespace MxGateway.IntegrationTests;
|
||||
|
||||
public sealed class LiveMxAccessFactAttribute : FactAttribute
|
||||
{
|
||||
public LiveMxAccessFactAttribute()
|
||||
{
|
||||
if (!IntegrationTestEnvironment.LiveMxAccessTestsEnabled)
|
||||
{
|
||||
Skip = $"Set {IntegrationTestEnvironment.LiveMxAccessVariableName}=1 to run live MXAccess tests.";
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
|
||||
<ProjectReference Include="..\MxGateway.Server\MxGateway.Server.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@@ -0,0 +1,517 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Diagnostics;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Grpc.Core;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Configuration;
|
||||
using MxGateway.Server.Grpc;
|
||||
using MxGateway.Server.Metrics;
|
||||
using MxGateway.Server.Security.Authorization;
|
||||
using MxGateway.Server.Sessions;
|
||||
using MxGateway.Server.Workers;
|
||||
using Xunit.Abstractions;
|
||||
|
||||
namespace MxGateway.IntegrationTests;
|
||||
|
||||
public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
{
|
||||
private static readonly TimeSpan CommandTimeout = TimeSpan.FromSeconds(15);
|
||||
private static readonly TimeSpan StreamShutdownTimeout = TimeSpan.FromSeconds(10);
|
||||
|
||||
[LiveMxAccessFact]
|
||||
[Trait("Category", "LiveMxAccess")]
|
||||
public async Task GatewaySession_WithLiveWorker_RegistersAdvisesStreamsDataAndCloses()
|
||||
{
|
||||
string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath();
|
||||
Assert.True(
|
||||
File.Exists(workerExecutablePath),
|
||||
$"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}.");
|
||||
|
||||
TestWorkerProcessFactory processFactory = new(output);
|
||||
await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output);
|
||||
|
||||
string? sessionId = null;
|
||||
RecordingServerStreamWriter<MxEvent>? eventWriter = null;
|
||||
Task? streamTask = null;
|
||||
|
||||
try
|
||||
{
|
||||
OpenSessionReply openReply = await fixture.Service.OpenSession(
|
||||
new OpenSessionRequest
|
||||
{
|
||||
ClientSessionName = "live-mxaccess-smoke",
|
||||
ClientCorrelationId = "live-open",
|
||||
CommandTimeout = Duration.FromTimeSpan(CommandTimeout),
|
||||
},
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
|
||||
sessionId = openReply.SessionId;
|
||||
output.WriteLine($"OpenSession status={openReply.ProtocolStatus.Code} session={sessionId} worker_pid={openReply.WorkerProcessId}");
|
||||
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
|
||||
Assert.True(openReply.WorkerProcessId > 0);
|
||||
|
||||
eventWriter = new RecordingServerStreamWriter<MxEvent>();
|
||||
streamTask = fixture.Service.StreamEvents(
|
||||
new StreamEventsRequest { SessionId = sessionId },
|
||||
eventWriter,
|
||||
new TestServerCallContext());
|
||||
|
||||
MxCommandReply registerReply = await fixture.Service.Invoke(
|
||||
CreateRegisterRequest(sessionId),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("Register", registerReply);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
|
||||
Assert.True(registerReply.Register.ServerHandle > 0);
|
||||
|
||||
MxCommandReply addItemReply = await fixture.Service.Invoke(
|
||||
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("AddItem", addItemReply);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
|
||||
Assert.True(addItemReply.AddItem.ItemHandle > 0);
|
||||
|
||||
MxCommandReply adviseReply = await fixture.Service.Invoke(
|
||||
CreateAdviseRequest(
|
||||
sessionId,
|
||||
registerReply.Register.ServerHandle,
|
||||
addItemReply.AddItem.ItemHandle),
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
LogReply("Advise", adviseReply);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
|
||||
|
||||
MxEvent dataChange = await eventWriter
|
||||
.WaitForFirstMessageAsync(IntegrationTestEnvironment.LiveMxAccessEventTimeout)
|
||||
.ConfigureAwait(false);
|
||||
LogEvent(dataChange);
|
||||
|
||||
Assert.Equal(MxEventFamily.OnDataChange, dataChange.Family);
|
||||
Assert.Equal(sessionId, dataChange.SessionId);
|
||||
Assert.Equal(registerReply.Register.ServerHandle, dataChange.ServerHandle);
|
||||
Assert.Equal(addItemReply.AddItem.ItemHandle, dataChange.ItemHandle);
|
||||
}
|
||||
finally
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(sessionId))
|
||||
{
|
||||
await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (streamTask is not null)
|
||||
{
|
||||
await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static MxCommandRequest CreateRegisterRequest(string sessionId)
|
||||
{
|
||||
return new MxCommandRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
ClientCorrelationId = "live-register",
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Register,
|
||||
Register = new RegisterCommand
|
||||
{
|
||||
ClientName = IntegrationTestEnvironment.LiveMxAccessClientName,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private static MxCommandRequest CreateAddItemRequest(
|
||||
string sessionId,
|
||||
int serverHandle)
|
||||
{
|
||||
return new MxCommandRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
ClientCorrelationId = "live-add-item",
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AddItem,
|
||||
AddItem = new AddItemCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemDefinition = IntegrationTestEnvironment.LiveMxAccessItem,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private static MxCommandRequest CreateAdviseRequest(
|
||||
string sessionId,
|
||||
int serverHandle,
|
||||
int itemHandle)
|
||||
{
|
||||
return new MxCommandRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
ClientCorrelationId = "live-advise",
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Advise,
|
||||
Advise = new AdviseCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemHandle = itemHandle,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private async Task CloseSessionAsync(
|
||||
GatewayServiceFixture fixture,
|
||||
string sessionId)
|
||||
{
|
||||
CloseSessionReply closeReply = await fixture.Service.CloseSession(
|
||||
new CloseSessionRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
ClientCorrelationId = "live-close",
|
||||
},
|
||||
new TestServerCallContext()).ConfigureAwait(false);
|
||||
|
||||
output.WriteLine($"CloseSession status={closeReply.ProtocolStatus.Code} final_state={closeReply.FinalState}");
|
||||
}
|
||||
|
||||
private void LogReply(
|
||||
string method,
|
||||
MxCommandReply reply)
|
||||
{
|
||||
output.WriteLine(
|
||||
$"{method} status={reply.ProtocolStatus.Code} hresult={reply.Hresult} diagnostic={reply.DiagnosticMessage}");
|
||||
|
||||
foreach (MxStatusProxy status in reply.Statuses)
|
||||
{
|
||||
output.WriteLine(
|
||||
$"{method} mxstatus success={status.Success} category={status.Category} detail={status.Detail} text={status.DiagnosticText}");
|
||||
}
|
||||
}
|
||||
|
||||
private void LogEvent(MxEvent dataChange)
|
||||
{
|
||||
output.WriteLine(
|
||||
$"Event family={dataChange.Family} worker_sequence={dataChange.WorkerSequence} server_handle={dataChange.ServerHandle} item_handle={dataChange.ItemHandle} quality={dataChange.Quality}");
|
||||
output.WriteLine(
|
||||
$"Event value_type={dataChange.Value?.DataType} raw_status={dataChange.RawStatus}");
|
||||
}
|
||||
|
||||
private sealed class GatewayServiceFixture : IAsyncDisposable
|
||||
{
|
||||
private readonly GatewayMetrics _metrics = new();
|
||||
private readonly SessionRegistry _registry = new();
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
|
||||
public GatewayServiceFixture(
|
||||
string workerExecutablePath,
|
||||
IWorkerProcessFactory processFactory,
|
||||
ITestOutputHelper output)
|
||||
{
|
||||
IOptions<GatewayOptions> options = Options.Create(CreateOptions(workerExecutablePath));
|
||||
_loggerFactory = LoggerFactory.Create(builder => builder.AddProvider(new TestOutputLoggerProvider(output)));
|
||||
WorkerProcessLauncher launcher = new(
|
||||
options,
|
||||
processFactory,
|
||||
new WorkerProcessStartedProbe(),
|
||||
_metrics);
|
||||
SessionWorkerClientFactory workerClientFactory = new(
|
||||
launcher,
|
||||
options,
|
||||
_metrics,
|
||||
_loggerFactory);
|
||||
SessionManager sessionManager = new(
|
||||
_registry,
|
||||
workerClientFactory,
|
||||
options,
|
||||
_metrics,
|
||||
logger: _loggerFactory.CreateLogger<SessionManager>());
|
||||
MxAccessGrpcMapper mapper = new();
|
||||
EventStreamService eventStreamService = new(
|
||||
sessionManager,
|
||||
options,
|
||||
mapper,
|
||||
_metrics,
|
||||
_loggerFactory.CreateLogger<EventStreamService>());
|
||||
|
||||
Service = new MxAccessGatewayService(
|
||||
sessionManager,
|
||||
new GatewayRequestIdentityAccessor(),
|
||||
new MxAccessGrpcRequestValidator(),
|
||||
mapper,
|
||||
eventStreamService,
|
||||
_loggerFactory.CreateLogger<MxAccessGatewayService>());
|
||||
}
|
||||
|
||||
public MxAccessGatewayService Service { get; }
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
foreach (GatewaySession session in _registry.Snapshot())
|
||||
{
|
||||
await session.DisposeAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
_loggerFactory.Dispose();
|
||||
_metrics.Dispose();
|
||||
}
|
||||
|
||||
private static GatewayOptions CreateOptions(string workerExecutablePath)
|
||||
{
|
||||
return new GatewayOptions
|
||||
{
|
||||
Worker = new WorkerOptions
|
||||
{
|
||||
ExecutablePath = workerExecutablePath,
|
||||
StartupTimeoutSeconds = 30,
|
||||
ShutdownTimeoutSeconds = 15,
|
||||
HeartbeatIntervalSeconds = 5,
|
||||
HeartbeatGraceSeconds = 15,
|
||||
MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
|
||||
RequiredArchitecture = WorkerArchitecture.X86,
|
||||
},
|
||||
Sessions = new SessionOptions
|
||||
{
|
||||
DefaultCommandTimeoutSeconds = 15,
|
||||
MaxSessions = 1,
|
||||
},
|
||||
Events = new EventOptions
|
||||
{
|
||||
QueueCapacity = 32,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
|
||||
{
|
||||
private readonly object syncRoot = new();
|
||||
private readonly TaskCompletionSource<T> firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private readonly List<T> messages = [];
|
||||
|
||||
public IReadOnlyList<T> Messages
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (syncRoot)
|
||||
{
|
||||
return messages.ToArray();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public WriteOptions? WriteOptions { get; set; }
|
||||
|
||||
public Task WriteAsync(T message)
|
||||
{
|
||||
lock (syncRoot)
|
||||
{
|
||||
messages.Add(message);
|
||||
}
|
||||
|
||||
firstMessage.TrySetResult(message);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout)
|
||||
{
|
||||
return await firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext
|
||||
{
|
||||
private readonly Metadata requestHeaders = [];
|
||||
private readonly Metadata responseTrailers = [];
|
||||
private readonly Dictionary<object, object> userState = [];
|
||||
private Status status;
|
||||
private WriteOptions? writeOptions;
|
||||
|
||||
protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test";
|
||||
|
||||
protected override string HostCore => "localhost";
|
||||
|
||||
protected override string PeerCore => "ipv4:127.0.0.1:5000";
|
||||
|
||||
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
|
||||
|
||||
protected override Metadata RequestHeadersCore => requestHeaders;
|
||||
|
||||
protected override CancellationToken CancellationTokenCore => cancellationToken;
|
||||
|
||||
protected override Metadata ResponseTrailersCore => responseTrailers;
|
||||
|
||||
protected override Status StatusCore
|
||||
{
|
||||
get => status;
|
||||
set => status = value;
|
||||
}
|
||||
|
||||
protected override WriteOptions? WriteOptionsCore
|
||||
{
|
||||
get => writeOptions;
|
||||
set => writeOptions = value;
|
||||
}
|
||||
|
||||
protected override AuthContext AuthContextCore { get; } = new(
|
||||
string.Empty,
|
||||
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
|
||||
|
||||
protected override IDictionary<object, object> UserStateCore => userState;
|
||||
|
||||
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
protected override ContextPropagationToken CreatePropagationTokenCore(
|
||||
ContextPropagationOptions? options)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class TestWorkerProcessFactory(ITestOutputHelper output) : IWorkerProcessFactory
|
||||
{
|
||||
private readonly ConcurrentBag<TestWorkerProcess> processes = [];
|
||||
|
||||
public IWorkerProcess Start(ProcessStartInfo startInfo)
|
||||
{
|
||||
startInfo.RedirectStandardError = true;
|
||||
startInfo.RedirectStandardOutput = true;
|
||||
startInfo.UseShellExecute = false;
|
||||
|
||||
Process process = new()
|
||||
{
|
||||
StartInfo = startInfo,
|
||||
EnableRaisingEvents = true,
|
||||
};
|
||||
|
||||
process.OutputDataReceived += (_, args) => WriteWorkerOutput("stdout", args.Data);
|
||||
process.ErrorDataReceived += (_, args) => WriteWorkerOutput("stderr", args.Data);
|
||||
|
||||
if (!process.Start())
|
||||
{
|
||||
process.Dispose();
|
||||
throw new InvalidOperationException("Worker process failed to start.");
|
||||
}
|
||||
|
||||
process.BeginOutputReadLine();
|
||||
process.BeginErrorReadLine();
|
||||
|
||||
TestWorkerProcess workerProcess = new(process);
|
||||
processes.Add(workerProcess);
|
||||
output.WriteLine($"WorkerProcess started pid={workerProcess.Id} path={startInfo.FileName}");
|
||||
|
||||
return workerProcess;
|
||||
}
|
||||
|
||||
public async Task WaitForProcessesAsync(TimeSpan timeout)
|
||||
{
|
||||
foreach (TestWorkerProcess process in processes)
|
||||
{
|
||||
if (process.HasExited)
|
||||
{
|
||||
output.WriteLine($"WorkerProcess exited pid={process.Id} exit_code={process.ExitCode}");
|
||||
continue;
|
||||
}
|
||||
|
||||
using CancellationTokenSource timeoutCancellation = new(timeout);
|
||||
await process.WaitForExitAsync(timeoutCancellation.Token).ConfigureAwait(false);
|
||||
output.WriteLine($"WorkerProcess exited pid={process.Id} exit_code={process.ExitCode}");
|
||||
}
|
||||
}
|
||||
|
||||
private void WriteWorkerOutput(
|
||||
string streamName,
|
||||
string? line)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(line))
|
||||
{
|
||||
output.WriteLine($"worker_{streamName}: {line}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class TestWorkerProcess(Process process) : IWorkerProcess
|
||||
{
|
||||
public int Id => process.Id;
|
||||
|
||||
public bool HasExited => process.HasExited;
|
||||
|
||||
public int? ExitCode => process.HasExited ? process.ExitCode : null;
|
||||
|
||||
public async ValueTask WaitForExitAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public void Kill(bool entireProcessTree)
|
||||
{
|
||||
process.Kill(entireProcessTree);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
process.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class TestOutputLoggerProvider(ITestOutputHelper output) : ILoggerProvider
|
||||
{
|
||||
public ILogger CreateLogger(string categoryName)
|
||||
{
|
||||
return new TestOutputLogger(output, categoryName);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class TestOutputLogger(
|
||||
ITestOutputHelper output,
|
||||
string categoryName) : ILogger
|
||||
{
|
||||
public IDisposable? BeginScope<TState>(TState state)
|
||||
where TState : notnull
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
public bool IsEnabled(LogLevel logLevel)
|
||||
{
|
||||
return logLevel >= LogLevel.Information;
|
||||
}
|
||||
|
||||
public void Log<TState>(
|
||||
LogLevel logLevel,
|
||||
EventId eventId,
|
||||
TState state,
|
||||
Exception? exception,
|
||||
Func<TState, Exception?, string> formatter)
|
||||
{
|
||||
if (!IsEnabled(logLevel))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
output.WriteLine($"{logLevel} {categoryName}: {formatter(state, exception)}");
|
||||
if (exception is not null)
|
||||
{
|
||||
output.WriteLine(exception.ToString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user