Merge pull request #91 from agent-2/issue-34-worker-live-mxaccess-smoke-test
Issue #34: Worker Live MXAccess Smoke Test
This commit was merged in pull request #91.
This commit is contained in:
@@ -35,6 +35,45 @@ inside the test.
|
|||||||
`OpenSession`, `Register`, `AddItem`, `Advise`, one streamed `OnDataChange`
|
`OpenSession`, `Register`, `AddItem`, `Advise`, one streamed `OnDataChange`
|
||||||
event, and `CloseSession` without loading MXAccess COM.
|
event, and `CloseSession` without loading MXAccess COM.
|
||||||
|
|
||||||
|
## Live MXAccess Smoke
|
||||||
|
|
||||||
|
`WorkerLiveMxAccessSmokeTests` in `src/MxGateway.IntegrationTests/` composes the
|
||||||
|
real gRPC service, `SessionManager`, `SessionWorkerClientFactory`,
|
||||||
|
`WorkerClient`, `WorkerProcessLauncher`, and `MxGateway.Worker.exe`. It is
|
||||||
|
skipped unless `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1` is set because it creates
|
||||||
|
the installed MXAccess COM object and depends on live provider state.
|
||||||
|
|
||||||
|
The live smoke opens a gateway session, launches the x86 worker, runs
|
||||||
|
`Register`, `AddItem`, and `Advise`, waits a bounded time for one
|
||||||
|
`OnDataChange`, and closes the session in a `finally` block so the worker gets a
|
||||||
|
graceful shutdown request even when a command or event assertion fails.
|
||||||
|
|
||||||
|
Build the worker before running the smoke:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
dotnet build src/MxGateway.Worker/MxGateway.Worker.csproj -p:Platform=x86
|
||||||
|
```
|
||||||
|
|
||||||
|
Run the smoke explicitly:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
$env:MXGATEWAY_RUN_LIVE_MXACCESS_TESTS = "1"
|
||||||
|
dotnet test src/MxGateway.IntegrationTests/MxGateway.IntegrationTests.csproj --filter FullyQualifiedName~WorkerLiveMxAccessSmokeTests
|
||||||
|
```
|
||||||
|
|
||||||
|
Optional live smoke variables:
|
||||||
|
|
||||||
|
| Variable | Default | Description |
|
||||||
|
|----------|---------|-------------|
|
||||||
|
| `MXGATEWAY_LIVE_MXACCESS_WORKER_EXE` | First existing `MxGateway.Worker.exe` under `src/MxGateway.Worker/bin/...` | Worker executable path. Set this when running against a packaged worker or a non-default build output. |
|
||||||
|
| `MXGATEWAY_LIVE_MXACCESS_ITEM` | `TestChildObject.TestInt` | MXAccess item reference used by `AddItem`. |
|
||||||
|
| `MXGATEWAY_LIVE_MXACCESS_CLIENT_NAME` | `MxGateway.IntegrationTests` | Client name passed to `Register`. |
|
||||||
|
| `MXGATEWAY_LIVE_MXACCESS_EVENT_TIMEOUT_SECONDS` | `15` | Maximum wait for the first `OnDataChange`. |
|
||||||
|
|
||||||
|
The test output includes session id, worker process id, command status,
|
||||||
|
HRESULT/status diagnostics, event sequence and handles, close status, and worker
|
||||||
|
stdout/stderr lines emitted during the run.
|
||||||
|
|
||||||
## Focused Commands
|
## Focused Commands
|
||||||
|
|
||||||
Run the fake worker tests after changing gateway worker IPC, session startup, or
|
Run the fake worker tests after changing gateway worker IPC, session startup, or
|
||||||
|
|||||||
@@ -807,6 +807,14 @@ tests. `AddItem` uses `TestChildObject.TestInt` by default and accepts an
|
|||||||
override through `MXGATEWAY_LIVE_MXACCESS_ITEM`; `AddItem2` uses the captured
|
override through `MXGATEWAY_LIVE_MXACCESS_ITEM`; `AddItem2` uses the captured
|
||||||
parity fixture shape `AddItem2("TestInt", "TestChildObject")`.
|
parity fixture shape `AddItem2("TestInt", "TestChildObject")`.
|
||||||
|
|
||||||
|
`WorkerLiveMxAccessSmokeTests` in `src/MxGateway.IntegrationTests/` uses the
|
||||||
|
same opt-in variable for the gateway-to-worker live smoke. It launches the x86
|
||||||
|
worker through `WorkerProcessLauncher`, opens a gateway session, runs
|
||||||
|
`Register`, `AddItem`, and `Advise`, waits for one `OnDataChange`, and closes
|
||||||
|
the session. The smoke accepts `MXGATEWAY_LIVE_MXACCESS_WORKER_EXE` for a
|
||||||
|
non-default worker executable path and
|
||||||
|
`MXGATEWAY_LIVE_MXACCESS_EVENT_TIMEOUT_SECONDS` for the bounded event wait.
|
||||||
|
|
||||||
## Initial Implementation Slice
|
## Initial Implementation Slice
|
||||||
|
|
||||||
The first worker slice should implement:
|
The first worker slice should implement:
|
||||||
|
|||||||
@@ -3,10 +3,92 @@ namespace MxGateway.IntegrationTests;
|
|||||||
public static class IntegrationTestEnvironment
|
public static class IntegrationTestEnvironment
|
||||||
{
|
{
|
||||||
public const string LiveMxAccessVariableName = "MXGATEWAY_RUN_LIVE_MXACCESS_TESTS";
|
public const string LiveMxAccessVariableName = "MXGATEWAY_RUN_LIVE_MXACCESS_TESTS";
|
||||||
|
public const string LiveMxAccessWorkerExecutableVariableName = "MXGATEWAY_LIVE_MXACCESS_WORKER_EXE";
|
||||||
|
public const string LiveMxAccessItemVariableName = "MXGATEWAY_LIVE_MXACCESS_ITEM";
|
||||||
|
public const string LiveMxAccessClientNameVariableName = "MXGATEWAY_LIVE_MXACCESS_CLIENT_NAME";
|
||||||
|
public const string LiveMxAccessEventTimeoutSecondsVariableName = "MXGATEWAY_LIVE_MXACCESS_EVENT_TIMEOUT_SECONDS";
|
||||||
|
|
||||||
public static bool LiveMxAccessTestsEnabled =>
|
public static bool LiveMxAccessTestsEnabled =>
|
||||||
string.Equals(
|
string.Equals(
|
||||||
Environment.GetEnvironmentVariable(LiveMxAccessVariableName),
|
Environment.GetEnvironmentVariable(LiveMxAccessVariableName),
|
||||||
"1",
|
"1",
|
||||||
StringComparison.Ordinal);
|
StringComparison.Ordinal);
|
||||||
|
|
||||||
|
public static string LiveMxAccessItem =>
|
||||||
|
GetOptionalEnvironmentVariable(
|
||||||
|
LiveMxAccessItemVariableName,
|
||||||
|
"TestChildObject.TestInt");
|
||||||
|
|
||||||
|
public static string LiveMxAccessClientName =>
|
||||||
|
GetOptionalEnvironmentVariable(
|
||||||
|
LiveMxAccessClientNameVariableName,
|
||||||
|
"MxGateway.IntegrationTests");
|
||||||
|
|
||||||
|
public static TimeSpan LiveMxAccessEventTimeout =>
|
||||||
|
TimeSpan.FromSeconds(GetPositiveIntegerEnvironmentVariable(
|
||||||
|
LiveMxAccessEventTimeoutSecondsVariableName,
|
||||||
|
defaultValue: 15));
|
||||||
|
|
||||||
|
public static string ResolveLiveMxAccessWorkerExecutablePath()
|
||||||
|
{
|
||||||
|
string? configuredPath = Environment.GetEnvironmentVariable(LiveMxAccessWorkerExecutableVariableName);
|
||||||
|
if (!string.IsNullOrWhiteSpace(configuredPath))
|
||||||
|
{
|
||||||
|
return Path.GetFullPath(configuredPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
string repositoryRoot = ResolveRepositoryRoot(AppContext.BaseDirectory);
|
||||||
|
string[] candidatePaths =
|
||||||
|
[
|
||||||
|
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Debug", "net48", "MxGateway.Worker.exe"),
|
||||||
|
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "Debug", "net48", "MxGateway.Worker.exe"),
|
||||||
|
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Release", "net48", "MxGateway.Worker.exe"),
|
||||||
|
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "Release", "net48", "MxGateway.Worker.exe"),
|
||||||
|
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Release", "MxGateway.Worker.exe"),
|
||||||
|
];
|
||||||
|
|
||||||
|
return candidatePaths.FirstOrDefault(File.Exists)
|
||||||
|
?? candidatePaths[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string GetOptionalEnvironmentVariable(
|
||||||
|
string name,
|
||||||
|
string defaultValue)
|
||||||
|
{
|
||||||
|
string? value = Environment.GetEnvironmentVariable(name);
|
||||||
|
return string.IsNullOrWhiteSpace(value)
|
||||||
|
? defaultValue
|
||||||
|
: value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int GetPositiveIntegerEnvironmentVariable(
|
||||||
|
string name,
|
||||||
|
int defaultValue)
|
||||||
|
{
|
||||||
|
string? value = Environment.GetEnvironmentVariable(name);
|
||||||
|
if (int.TryParse(value, out int parsed) && parsed > 0)
|
||||||
|
{
|
||||||
|
return parsed;
|
||||||
|
}
|
||||||
|
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
internal static string ResolveRepositoryRoot(string startDirectory)
|
||||||
|
{
|
||||||
|
DirectoryInfo? directory = new(startDirectory);
|
||||||
|
while (directory is not null)
|
||||||
|
{
|
||||||
|
if ((Directory.Exists(Path.Combine(directory.FullName, ".git"))
|
||||||
|
|| File.Exists(Path.Combine(directory.FullName, ".git")))
|
||||||
|
&& Directory.Exists(Path.Combine(directory.FullName, "src")))
|
||||||
|
{
|
||||||
|
return directory.FullName;
|
||||||
|
}
|
||||||
|
|
||||||
|
directory = directory.Parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Directory.GetCurrentDirectory();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,4 +9,37 @@ public sealed class IntegrationTestEnvironmentTests
|
|||||||
"MXGATEWAY_RUN_LIVE_MXACCESS_TESTS",
|
"MXGATEWAY_RUN_LIVE_MXACCESS_TESTS",
|
||||||
IntegrationTestEnvironment.LiveMxAccessVariableName);
|
IntegrationTestEnvironment.LiveMxAccessVariableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void LiveMxAccessWorkerExecutable_UsesDocumentedEnvironmentVariable()
|
||||||
|
{
|
||||||
|
Assert.Equal(
|
||||||
|
"MXGATEWAY_LIVE_MXACCESS_WORKER_EXE",
|
||||||
|
IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ResolveRepositoryRoot_AcceptsGitWorktreeFile()
|
||||||
|
{
|
||||||
|
string temporaryRoot = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
|
||||||
|
string nestedDirectory = Path.Combine(temporaryRoot, "tests", "bin");
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Directory.CreateDirectory(nestedDirectory);
|
||||||
|
Directory.CreateDirectory(Path.Combine(temporaryRoot, "src"));
|
||||||
|
File.WriteAllText(Path.Combine(temporaryRoot, ".git"), "gitdir: ../.git/worktrees/test");
|
||||||
|
|
||||||
|
string repositoryRoot = IntegrationTestEnvironment.ResolveRepositoryRoot(nestedDirectory);
|
||||||
|
|
||||||
|
Assert.Equal(temporaryRoot, repositoryRoot);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (Directory.Exists(temporaryRoot))
|
||||||
|
{
|
||||||
|
Directory.Delete(temporaryRoot, recursive: true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,12 @@
|
|||||||
|
namespace MxGateway.IntegrationTests;
|
||||||
|
|
||||||
|
public sealed class LiveMxAccessFactAttribute : FactAttribute
|
||||||
|
{
|
||||||
|
public LiveMxAccessFactAttribute()
|
||||||
|
{
|
||||||
|
if (!IntegrationTestEnvironment.LiveMxAccessTestsEnabled)
|
||||||
|
{
|
||||||
|
Skip = $"Set {IntegrationTestEnvironment.LiveMxAccessVariableName}=1 to run live MXAccess tests.";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
|
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
|
||||||
|
<ProjectReference Include="..\MxGateway.Server\MxGateway.Server.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@@ -0,0 +1,517 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Diagnostics;
|
||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using Grpc.Core;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MxGateway.Contracts;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
using MxGateway.Server.Grpc;
|
||||||
|
using MxGateway.Server.Metrics;
|
||||||
|
using MxGateway.Server.Security.Authorization;
|
||||||
|
using MxGateway.Server.Sessions;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
using Xunit.Abstractions;
|
||||||
|
|
||||||
|
namespace MxGateway.IntegrationTests;
|
||||||
|
|
||||||
|
public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||||
|
{
|
||||||
|
private static readonly TimeSpan CommandTimeout = TimeSpan.FromSeconds(15);
|
||||||
|
private static readonly TimeSpan StreamShutdownTimeout = TimeSpan.FromSeconds(10);
|
||||||
|
|
||||||
|
[LiveMxAccessFact]
|
||||||
|
[Trait("Category", "LiveMxAccess")]
|
||||||
|
public async Task GatewaySession_WithLiveWorker_RegistersAdvisesStreamsDataAndCloses()
|
||||||
|
{
|
||||||
|
string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath();
|
||||||
|
Assert.True(
|
||||||
|
File.Exists(workerExecutablePath),
|
||||||
|
$"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}.");
|
||||||
|
|
||||||
|
TestWorkerProcessFactory processFactory = new(output);
|
||||||
|
await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output);
|
||||||
|
|
||||||
|
string? sessionId = null;
|
||||||
|
RecordingServerStreamWriter<MxEvent>? eventWriter = null;
|
||||||
|
Task? streamTask = null;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
OpenSessionReply openReply = await fixture.Service.OpenSession(
|
||||||
|
new OpenSessionRequest
|
||||||
|
{
|
||||||
|
ClientSessionName = "live-mxaccess-smoke",
|
||||||
|
ClientCorrelationId = "live-open",
|
||||||
|
CommandTimeout = Duration.FromTimeSpan(CommandTimeout),
|
||||||
|
},
|
||||||
|
new TestServerCallContext()).ConfigureAwait(false);
|
||||||
|
|
||||||
|
sessionId = openReply.SessionId;
|
||||||
|
output.WriteLine($"OpenSession status={openReply.ProtocolStatus.Code} session={sessionId} worker_pid={openReply.WorkerProcessId}");
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
|
||||||
|
Assert.True(openReply.WorkerProcessId > 0);
|
||||||
|
|
||||||
|
eventWriter = new RecordingServerStreamWriter<MxEvent>();
|
||||||
|
streamTask = fixture.Service.StreamEvents(
|
||||||
|
new StreamEventsRequest { SessionId = sessionId },
|
||||||
|
eventWriter,
|
||||||
|
new TestServerCallContext());
|
||||||
|
|
||||||
|
MxCommandReply registerReply = await fixture.Service.Invoke(
|
||||||
|
CreateRegisterRequest(sessionId),
|
||||||
|
new TestServerCallContext()).ConfigureAwait(false);
|
||||||
|
LogReply("Register", registerReply);
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
|
||||||
|
Assert.True(registerReply.Register.ServerHandle > 0);
|
||||||
|
|
||||||
|
MxCommandReply addItemReply = await fixture.Service.Invoke(
|
||||||
|
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
|
||||||
|
new TestServerCallContext()).ConfigureAwait(false);
|
||||||
|
LogReply("AddItem", addItemReply);
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
|
||||||
|
Assert.True(addItemReply.AddItem.ItemHandle > 0);
|
||||||
|
|
||||||
|
MxCommandReply adviseReply = await fixture.Service.Invoke(
|
||||||
|
CreateAdviseRequest(
|
||||||
|
sessionId,
|
||||||
|
registerReply.Register.ServerHandle,
|
||||||
|
addItemReply.AddItem.ItemHandle),
|
||||||
|
new TestServerCallContext()).ConfigureAwait(false);
|
||||||
|
LogReply("Advise", adviseReply);
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
|
||||||
|
|
||||||
|
MxEvent dataChange = await eventWriter
|
||||||
|
.WaitForFirstMessageAsync(IntegrationTestEnvironment.LiveMxAccessEventTimeout)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
LogEvent(dataChange);
|
||||||
|
|
||||||
|
Assert.Equal(MxEventFamily.OnDataChange, dataChange.Family);
|
||||||
|
Assert.Equal(sessionId, dataChange.SessionId);
|
||||||
|
Assert.Equal(registerReply.Register.ServerHandle, dataChange.ServerHandle);
|
||||||
|
Assert.Equal(addItemReply.AddItem.ItemHandle, dataChange.ItemHandle);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!string.IsNullOrWhiteSpace(sessionId))
|
||||||
|
{
|
||||||
|
await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (streamTask is not null)
|
||||||
|
{
|
||||||
|
await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MxCommandRequest CreateRegisterRequest(string sessionId)
|
||||||
|
{
|
||||||
|
return new MxCommandRequest
|
||||||
|
{
|
||||||
|
SessionId = sessionId,
|
||||||
|
ClientCorrelationId = "live-register",
|
||||||
|
Command = new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.Register,
|
||||||
|
Register = new RegisterCommand
|
||||||
|
{
|
||||||
|
ClientName = IntegrationTestEnvironment.LiveMxAccessClientName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MxCommandRequest CreateAddItemRequest(
|
||||||
|
string sessionId,
|
||||||
|
int serverHandle)
|
||||||
|
{
|
||||||
|
return new MxCommandRequest
|
||||||
|
{
|
||||||
|
SessionId = sessionId,
|
||||||
|
ClientCorrelationId = "live-add-item",
|
||||||
|
Command = new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.AddItem,
|
||||||
|
AddItem = new AddItemCommand
|
||||||
|
{
|
||||||
|
ServerHandle = serverHandle,
|
||||||
|
ItemDefinition = IntegrationTestEnvironment.LiveMxAccessItem,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MxCommandRequest CreateAdviseRequest(
|
||||||
|
string sessionId,
|
||||||
|
int serverHandle,
|
||||||
|
int itemHandle)
|
||||||
|
{
|
||||||
|
return new MxCommandRequest
|
||||||
|
{
|
||||||
|
SessionId = sessionId,
|
||||||
|
ClientCorrelationId = "live-advise",
|
||||||
|
Command = new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.Advise,
|
||||||
|
Advise = new AdviseCommand
|
||||||
|
{
|
||||||
|
ServerHandle = serverHandle,
|
||||||
|
ItemHandle = itemHandle,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task CloseSessionAsync(
|
||||||
|
GatewayServiceFixture fixture,
|
||||||
|
string sessionId)
|
||||||
|
{
|
||||||
|
CloseSessionReply closeReply = await fixture.Service.CloseSession(
|
||||||
|
new CloseSessionRequest
|
||||||
|
{
|
||||||
|
SessionId = sessionId,
|
||||||
|
ClientCorrelationId = "live-close",
|
||||||
|
},
|
||||||
|
new TestServerCallContext()).ConfigureAwait(false);
|
||||||
|
|
||||||
|
output.WriteLine($"CloseSession status={closeReply.ProtocolStatus.Code} final_state={closeReply.FinalState}");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void LogReply(
|
||||||
|
string method,
|
||||||
|
MxCommandReply reply)
|
||||||
|
{
|
||||||
|
output.WriteLine(
|
||||||
|
$"{method} status={reply.ProtocolStatus.Code} hresult={reply.Hresult} diagnostic={reply.DiagnosticMessage}");
|
||||||
|
|
||||||
|
foreach (MxStatusProxy status in reply.Statuses)
|
||||||
|
{
|
||||||
|
output.WriteLine(
|
||||||
|
$"{method} mxstatus success={status.Success} category={status.Category} detail={status.Detail} text={status.DiagnosticText}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void LogEvent(MxEvent dataChange)
|
||||||
|
{
|
||||||
|
output.WriteLine(
|
||||||
|
$"Event family={dataChange.Family} worker_sequence={dataChange.WorkerSequence} server_handle={dataChange.ServerHandle} item_handle={dataChange.ItemHandle} quality={dataChange.Quality}");
|
||||||
|
output.WriteLine(
|
||||||
|
$"Event value_type={dataChange.Value?.DataType} raw_status={dataChange.RawStatus}");
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class GatewayServiceFixture : IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly GatewayMetrics _metrics = new();
|
||||||
|
private readonly SessionRegistry _registry = new();
|
||||||
|
private readonly ILoggerFactory _loggerFactory;
|
||||||
|
|
||||||
|
public GatewayServiceFixture(
|
||||||
|
string workerExecutablePath,
|
||||||
|
IWorkerProcessFactory processFactory,
|
||||||
|
ITestOutputHelper output)
|
||||||
|
{
|
||||||
|
IOptions<GatewayOptions> options = Options.Create(CreateOptions(workerExecutablePath));
|
||||||
|
_loggerFactory = LoggerFactory.Create(builder => builder.AddProvider(new TestOutputLoggerProvider(output)));
|
||||||
|
WorkerProcessLauncher launcher = new(
|
||||||
|
options,
|
||||||
|
processFactory,
|
||||||
|
new WorkerProcessStartedProbe(),
|
||||||
|
_metrics);
|
||||||
|
SessionWorkerClientFactory workerClientFactory = new(
|
||||||
|
launcher,
|
||||||
|
options,
|
||||||
|
_metrics,
|
||||||
|
_loggerFactory);
|
||||||
|
SessionManager sessionManager = new(
|
||||||
|
_registry,
|
||||||
|
workerClientFactory,
|
||||||
|
options,
|
||||||
|
_metrics,
|
||||||
|
logger: _loggerFactory.CreateLogger<SessionManager>());
|
||||||
|
MxAccessGrpcMapper mapper = new();
|
||||||
|
EventStreamService eventStreamService = new(
|
||||||
|
sessionManager,
|
||||||
|
options,
|
||||||
|
mapper,
|
||||||
|
_metrics,
|
||||||
|
_loggerFactory.CreateLogger<EventStreamService>());
|
||||||
|
|
||||||
|
Service = new MxAccessGatewayService(
|
||||||
|
sessionManager,
|
||||||
|
new GatewayRequestIdentityAccessor(),
|
||||||
|
new MxAccessGrpcRequestValidator(),
|
||||||
|
mapper,
|
||||||
|
eventStreamService,
|
||||||
|
_loggerFactory.CreateLogger<MxAccessGatewayService>());
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxAccessGatewayService Service { get; }
|
||||||
|
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
foreach (GatewaySession session in _registry.Snapshot())
|
||||||
|
{
|
||||||
|
await session.DisposeAsync().ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
_loggerFactory.Dispose();
|
||||||
|
_metrics.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static GatewayOptions CreateOptions(string workerExecutablePath)
|
||||||
|
{
|
||||||
|
return new GatewayOptions
|
||||||
|
{
|
||||||
|
Worker = new WorkerOptions
|
||||||
|
{
|
||||||
|
ExecutablePath = workerExecutablePath,
|
||||||
|
StartupTimeoutSeconds = 30,
|
||||||
|
ShutdownTimeoutSeconds = 15,
|
||||||
|
HeartbeatIntervalSeconds = 5,
|
||||||
|
HeartbeatGraceSeconds = 15,
|
||||||
|
MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
|
||||||
|
RequiredArchitecture = WorkerArchitecture.X86,
|
||||||
|
},
|
||||||
|
Sessions = new SessionOptions
|
||||||
|
{
|
||||||
|
DefaultCommandTimeoutSeconds = 15,
|
||||||
|
MaxSessions = 1,
|
||||||
|
},
|
||||||
|
Events = new EventOptions
|
||||||
|
{
|
||||||
|
QueueCapacity = 32,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
|
||||||
|
{
|
||||||
|
private readonly object syncRoot = new();
|
||||||
|
private readonly TaskCompletionSource<T> firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
|
private readonly List<T> messages = [];
|
||||||
|
|
||||||
|
public IReadOnlyList<T> Messages
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
return messages.ToArray();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public WriteOptions? WriteOptions { get; set; }
|
||||||
|
|
||||||
|
public Task WriteAsync(T message)
|
||||||
|
{
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
messages.Add(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
firstMessage.TrySetResult(message);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout)
|
||||||
|
{
|
||||||
|
return await firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext
|
||||||
|
{
|
||||||
|
private readonly Metadata requestHeaders = [];
|
||||||
|
private readonly Metadata responseTrailers = [];
|
||||||
|
private readonly Dictionary<object, object> userState = [];
|
||||||
|
private Status status;
|
||||||
|
private WriteOptions? writeOptions;
|
||||||
|
|
||||||
|
protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test";
|
||||||
|
|
||||||
|
protected override string HostCore => "localhost";
|
||||||
|
|
||||||
|
protected override string PeerCore => "ipv4:127.0.0.1:5000";
|
||||||
|
|
||||||
|
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
|
||||||
|
|
||||||
|
protected override Metadata RequestHeadersCore => requestHeaders;
|
||||||
|
|
||||||
|
protected override CancellationToken CancellationTokenCore => cancellationToken;
|
||||||
|
|
||||||
|
protected override Metadata ResponseTrailersCore => responseTrailers;
|
||||||
|
|
||||||
|
protected override Status StatusCore
|
||||||
|
{
|
||||||
|
get => status;
|
||||||
|
set => status = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override WriteOptions? WriteOptionsCore
|
||||||
|
{
|
||||||
|
get => writeOptions;
|
||||||
|
set => writeOptions = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override AuthContext AuthContextCore { get; } = new(
|
||||||
|
string.Empty,
|
||||||
|
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
|
||||||
|
|
||||||
|
protected override IDictionary<object, object> UserStateCore => userState;
|
||||||
|
|
||||||
|
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
|
||||||
|
{
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override ContextPropagationToken CreatePropagationTokenCore(
|
||||||
|
ContextPropagationOptions? options)
|
||||||
|
{
|
||||||
|
throw new NotSupportedException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class TestWorkerProcessFactory(ITestOutputHelper output) : IWorkerProcessFactory
|
||||||
|
{
|
||||||
|
private readonly ConcurrentBag<TestWorkerProcess> processes = [];
|
||||||
|
|
||||||
|
public IWorkerProcess Start(ProcessStartInfo startInfo)
|
||||||
|
{
|
||||||
|
startInfo.RedirectStandardError = true;
|
||||||
|
startInfo.RedirectStandardOutput = true;
|
||||||
|
startInfo.UseShellExecute = false;
|
||||||
|
|
||||||
|
Process process = new()
|
||||||
|
{
|
||||||
|
StartInfo = startInfo,
|
||||||
|
EnableRaisingEvents = true,
|
||||||
|
};
|
||||||
|
|
||||||
|
process.OutputDataReceived += (_, args) => WriteWorkerOutput("stdout", args.Data);
|
||||||
|
process.ErrorDataReceived += (_, args) => WriteWorkerOutput("stderr", args.Data);
|
||||||
|
|
||||||
|
if (!process.Start())
|
||||||
|
{
|
||||||
|
process.Dispose();
|
||||||
|
throw new InvalidOperationException("Worker process failed to start.");
|
||||||
|
}
|
||||||
|
|
||||||
|
process.BeginOutputReadLine();
|
||||||
|
process.BeginErrorReadLine();
|
||||||
|
|
||||||
|
TestWorkerProcess workerProcess = new(process);
|
||||||
|
processes.Add(workerProcess);
|
||||||
|
output.WriteLine($"WorkerProcess started pid={workerProcess.Id} path={startInfo.FileName}");
|
||||||
|
|
||||||
|
return workerProcess;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task WaitForProcessesAsync(TimeSpan timeout)
|
||||||
|
{
|
||||||
|
foreach (TestWorkerProcess process in processes)
|
||||||
|
{
|
||||||
|
if (process.HasExited)
|
||||||
|
{
|
||||||
|
output.WriteLine($"WorkerProcess exited pid={process.Id} exit_code={process.ExitCode}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
using CancellationTokenSource timeoutCancellation = new(timeout);
|
||||||
|
await process.WaitForExitAsync(timeoutCancellation.Token).ConfigureAwait(false);
|
||||||
|
output.WriteLine($"WorkerProcess exited pid={process.Id} exit_code={process.ExitCode}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void WriteWorkerOutput(
|
||||||
|
string streamName,
|
||||||
|
string? line)
|
||||||
|
{
|
||||||
|
if (!string.IsNullOrWhiteSpace(line))
|
||||||
|
{
|
||||||
|
output.WriteLine($"worker_{streamName}: {line}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class TestWorkerProcess(Process process) : IWorkerProcess
|
||||||
|
{
|
||||||
|
public int Id => process.Id;
|
||||||
|
|
||||||
|
public bool HasExited => process.HasExited;
|
||||||
|
|
||||||
|
public int? ExitCode => process.HasExited ? process.ExitCode : null;
|
||||||
|
|
||||||
|
public async ValueTask WaitForExitAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Kill(bool entireProcessTree)
|
||||||
|
{
|
||||||
|
process.Kill(entireProcessTree);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
process.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class TestOutputLoggerProvider(ITestOutputHelper output) : ILoggerProvider
|
||||||
|
{
|
||||||
|
public ILogger CreateLogger(string categoryName)
|
||||||
|
{
|
||||||
|
return new TestOutputLogger(output, categoryName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class TestOutputLogger(
|
||||||
|
ITestOutputHelper output,
|
||||||
|
string categoryName) : ILogger
|
||||||
|
{
|
||||||
|
public IDisposable? BeginScope<TState>(TState state)
|
||||||
|
where TState : notnull
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool IsEnabled(LogLevel logLevel)
|
||||||
|
{
|
||||||
|
return logLevel >= LogLevel.Information;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Log<TState>(
|
||||||
|
LogLevel logLevel,
|
||||||
|
EventId eventId,
|
||||||
|
TState state,
|
||||||
|
Exception? exception,
|
||||||
|
Func<TState, Exception?, string> formatter)
|
||||||
|
{
|
||||||
|
if (!IsEnabled(logLevel))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
output.WriteLine($"{logLevel} {categoryName}: {formatter(state, exception)}");
|
||||||
|
if (exception is not null)
|
||||||
|
{
|
||||||
|
output.WriteLine(exception.ToString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user