Add live MXAccess worker smoke test

This commit is contained in:
Joseph Doherty
2026-04-26 19:58:33 -04:00
parent 160343aff4
commit 0f17a1d1d9
7 changed files with 666 additions and 0 deletions
@@ -3,10 +3,91 @@ namespace MxGateway.IntegrationTests;
public static class IntegrationTestEnvironment
{
public const string LiveMxAccessVariableName = "MXGATEWAY_RUN_LIVE_MXACCESS_TESTS";
public const string LiveMxAccessWorkerExecutableVariableName = "MXGATEWAY_LIVE_MXACCESS_WORKER_EXE";
public const string LiveMxAccessItemVariableName = "MXGATEWAY_LIVE_MXACCESS_ITEM";
public const string LiveMxAccessClientNameVariableName = "MXGATEWAY_LIVE_MXACCESS_CLIENT_NAME";
public const string LiveMxAccessEventTimeoutSecondsVariableName = "MXGATEWAY_LIVE_MXACCESS_EVENT_TIMEOUT_SECONDS";
public static bool LiveMxAccessTestsEnabled =>
string.Equals(
Environment.GetEnvironmentVariable(LiveMxAccessVariableName),
"1",
StringComparison.Ordinal);
public static string LiveMxAccessItem =>
GetOptionalEnvironmentVariable(
LiveMxAccessItemVariableName,
"TestChildObject.TestInt");
public static string LiveMxAccessClientName =>
GetOptionalEnvironmentVariable(
LiveMxAccessClientNameVariableName,
"MxGateway.IntegrationTests");
public static TimeSpan LiveMxAccessEventTimeout =>
TimeSpan.FromSeconds(GetPositiveIntegerEnvironmentVariable(
LiveMxAccessEventTimeoutSecondsVariableName,
defaultValue: 15));
public static string ResolveLiveMxAccessWorkerExecutablePath()
{
string? configuredPath = Environment.GetEnvironmentVariable(LiveMxAccessWorkerExecutableVariableName);
if (!string.IsNullOrWhiteSpace(configuredPath))
{
return Path.GetFullPath(configuredPath);
}
string repositoryRoot = ResolveRepositoryRoot();
string[] candidatePaths =
[
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Debug", "net48", "MxGateway.Worker.exe"),
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "Debug", "net48", "MxGateway.Worker.exe"),
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Release", "net48", "MxGateway.Worker.exe"),
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "Release", "net48", "MxGateway.Worker.exe"),
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Release", "MxGateway.Worker.exe"),
];
return candidatePaths.FirstOrDefault(File.Exists)
?? candidatePaths[0];
}
private static string GetOptionalEnvironmentVariable(
string name,
string defaultValue)
{
string? value = Environment.GetEnvironmentVariable(name);
return string.IsNullOrWhiteSpace(value)
? defaultValue
: value;
}
private static int GetPositiveIntegerEnvironmentVariable(
string name,
int defaultValue)
{
string? value = Environment.GetEnvironmentVariable(name);
if (int.TryParse(value, out int parsed) && parsed > 0)
{
return parsed;
}
return defaultValue;
}
private static string ResolveRepositoryRoot()
{
DirectoryInfo? directory = new(AppContext.BaseDirectory);
while (directory is not null)
{
if (Directory.Exists(Path.Combine(directory.FullName, ".git"))
&& Directory.Exists(Path.Combine(directory.FullName, "src")))
{
return directory.FullName;
}
directory = directory.Parent;
}
return Directory.GetCurrentDirectory();
}
}
@@ -9,4 +9,12 @@ public sealed class IntegrationTestEnvironmentTests
"MXGATEWAY_RUN_LIVE_MXACCESS_TESTS",
IntegrationTestEnvironment.LiveMxAccessVariableName);
}
[Fact]
public void LiveMxAccessWorkerExecutable_UsesDocumentedEnvironmentVariable()
{
Assert.Equal(
"MXGATEWAY_LIVE_MXACCESS_WORKER_EXE",
IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName);
}
}
@@ -0,0 +1,12 @@
namespace MxGateway.IntegrationTests;
public sealed class LiveMxAccessFactAttribute : FactAttribute
{
public LiveMxAccessFactAttribute()
{
if (!IntegrationTestEnvironment.LiveMxAccessTestsEnabled)
{
Skip = $"Set {IntegrationTestEnvironment.LiveMxAccessVariableName}=1 to run live MXAccess tests.";
}
}
}
@@ -18,6 +18,7 @@
<ItemGroup>
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
<ProjectReference Include="..\MxGateway.Server\MxGateway.Server.csproj" />
</ItemGroup>
</Project>
@@ -0,0 +1,517 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Grpc;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
using Xunit.Abstractions;
namespace MxGateway.IntegrationTests;
public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
{
private static readonly TimeSpan CommandTimeout = TimeSpan.FromSeconds(15);
private static readonly TimeSpan StreamShutdownTimeout = TimeSpan.FromSeconds(10);
[LiveMxAccessFact]
[Trait("Category", "LiveMxAccess")]
public async Task GatewaySession_WithLiveWorker_RegistersAdvisesStreamsDataAndCloses()
{
string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath();
Assert.True(
File.Exists(workerExecutablePath),
$"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}.");
TestWorkerProcessFactory processFactory = new(output);
await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output);
string? sessionId = null;
RecordingServerStreamWriter<MxEvent>? eventWriter = null;
Task? streamTask = null;
try
{
OpenSessionReply openReply = await fixture.Service.OpenSession(
new OpenSessionRequest
{
ClientSessionName = "live-mxaccess-smoke",
ClientCorrelationId = "live-open",
CommandTimeout = Duration.FromTimeSpan(CommandTimeout),
},
new TestServerCallContext()).ConfigureAwait(false);
sessionId = openReply.SessionId;
output.WriteLine($"OpenSession status={openReply.ProtocolStatus.Code} session={sessionId} worker_pid={openReply.WorkerProcessId}");
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
Assert.True(openReply.WorkerProcessId > 0);
eventWriter = new RecordingServerStreamWriter<MxEvent>();
streamTask = fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
eventWriter,
new TestServerCallContext());
MxCommandReply registerReply = await fixture.Service.Invoke(
CreateRegisterRequest(sessionId),
new TestServerCallContext()).ConfigureAwait(false);
LogReply("Register", registerReply);
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
Assert.True(registerReply.Register.ServerHandle > 0);
MxCommandReply addItemReply = await fixture.Service.Invoke(
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
new TestServerCallContext()).ConfigureAwait(false);
LogReply("AddItem", addItemReply);
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
Assert.True(addItemReply.AddItem.ItemHandle > 0);
MxCommandReply adviseReply = await fixture.Service.Invoke(
CreateAdviseRequest(
sessionId,
registerReply.Register.ServerHandle,
addItemReply.AddItem.ItemHandle),
new TestServerCallContext()).ConfigureAwait(false);
LogReply("Advise", adviseReply);
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
MxEvent dataChange = await eventWriter
.WaitForFirstMessageAsync(IntegrationTestEnvironment.LiveMxAccessEventTimeout)
.ConfigureAwait(false);
LogEvent(dataChange);
Assert.Equal(MxEventFamily.OnDataChange, dataChange.Family);
Assert.Equal(sessionId, dataChange.SessionId);
Assert.Equal(registerReply.Register.ServerHandle, dataChange.ServerHandle);
Assert.Equal(addItemReply.AddItem.ItemHandle, dataChange.ItemHandle);
}
finally
{
try
{
if (!string.IsNullOrWhiteSpace(sessionId))
{
await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false);
}
if (streamTask is not null)
{
await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false);
}
}
finally
{
await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false);
}
}
}
private static MxCommandRequest CreateRegisterRequest(string sessionId)
{
return new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = "live-register",
Command = new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand
{
ClientName = IntegrationTestEnvironment.LiveMxAccessClientName,
},
},
};
}
private static MxCommandRequest CreateAddItemRequest(
string sessionId,
int serverHandle)
{
return new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = "live-add-item",
Command = new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = IntegrationTestEnvironment.LiveMxAccessItem,
},
},
};
}
private static MxCommandRequest CreateAdviseRequest(
string sessionId,
int serverHandle,
int itemHandle)
{
return new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = "live-advise",
Command = new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
},
};
}
private async Task CloseSessionAsync(
GatewayServiceFixture fixture,
string sessionId)
{
CloseSessionReply closeReply = await fixture.Service.CloseSession(
new CloseSessionRequest
{
SessionId = sessionId,
ClientCorrelationId = "live-close",
},
new TestServerCallContext()).ConfigureAwait(false);
output.WriteLine($"CloseSession status={closeReply.ProtocolStatus.Code} final_state={closeReply.FinalState}");
}
private void LogReply(
string method,
MxCommandReply reply)
{
output.WriteLine(
$"{method} status={reply.ProtocolStatus.Code} hresult={reply.Hresult} diagnostic={reply.DiagnosticMessage}");
foreach (MxStatusProxy status in reply.Statuses)
{
output.WriteLine(
$"{method} mxstatus success={status.Success} category={status.Category} detail={status.Detail} text={status.DiagnosticText}");
}
}
private void LogEvent(MxEvent dataChange)
{
output.WriteLine(
$"Event family={dataChange.Family} worker_sequence={dataChange.WorkerSequence} server_handle={dataChange.ServerHandle} item_handle={dataChange.ItemHandle} quality={dataChange.Quality}");
output.WriteLine(
$"Event value_type={dataChange.Value?.DataType} raw_status={dataChange.RawStatus}");
}
private sealed class GatewayServiceFixture : IAsyncDisposable
{
private readonly GatewayMetrics _metrics = new();
private readonly SessionRegistry _registry = new();
private readonly ILoggerFactory _loggerFactory;
public GatewayServiceFixture(
string workerExecutablePath,
IWorkerProcessFactory processFactory,
ITestOutputHelper output)
{
IOptions<GatewayOptions> options = Options.Create(CreateOptions(workerExecutablePath));
_loggerFactory = LoggerFactory.Create(builder => builder.AddProvider(new TestOutputLoggerProvider(output)));
WorkerProcessLauncher launcher = new(
options,
processFactory,
new WorkerProcessStartedProbe(),
_metrics);
SessionWorkerClientFactory workerClientFactory = new(
launcher,
options,
_metrics,
_loggerFactory);
SessionManager sessionManager = new(
_registry,
workerClientFactory,
options,
_metrics,
logger: _loggerFactory.CreateLogger<SessionManager>());
MxAccessGrpcMapper mapper = new();
EventStreamService eventStreamService = new(
sessionManager,
options,
mapper,
_metrics,
_loggerFactory.CreateLogger<EventStreamService>());
Service = new MxAccessGatewayService(
sessionManager,
new GatewayRequestIdentityAccessor(),
new MxAccessGrpcRequestValidator(),
mapper,
eventStreamService,
_loggerFactory.CreateLogger<MxAccessGatewayService>());
}
public MxAccessGatewayService Service { get; }
public async ValueTask DisposeAsync()
{
foreach (GatewaySession session in _registry.Snapshot())
{
await session.DisposeAsync().ConfigureAwait(false);
}
_loggerFactory.Dispose();
_metrics.Dispose();
}
private static GatewayOptions CreateOptions(string workerExecutablePath)
{
return new GatewayOptions
{
Worker = new WorkerOptions
{
ExecutablePath = workerExecutablePath,
StartupTimeoutSeconds = 30,
ShutdownTimeoutSeconds = 15,
HeartbeatIntervalSeconds = 5,
HeartbeatGraceSeconds = 15,
MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
RequiredArchitecture = WorkerArchitecture.X86,
},
Sessions = new SessionOptions
{
DefaultCommandTimeoutSeconds = 15,
MaxSessions = 1,
},
Events = new EventOptions
{
QueueCapacity = 32,
},
};
}
}
private sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
{
private readonly object syncRoot = new();
private readonly TaskCompletionSource<T> firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly List<T> messages = [];
public IReadOnlyList<T> Messages
{
get
{
lock (syncRoot)
{
return messages.ToArray();
}
}
}
public WriteOptions? WriteOptions { get; set; }
public Task WriteAsync(T message)
{
lock (syncRoot)
{
messages.Add(message);
}
firstMessage.TrySetResult(message);
return Task.CompletedTask;
}
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout)
{
return await firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
}
}
private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext
{
private readonly Metadata requestHeaders = [];
private readonly Metadata responseTrailers = [];
private readonly Dictionary<object, object> userState = [];
private Status status;
private WriteOptions? writeOptions;
protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test";
protected override string HostCore => "localhost";
protected override string PeerCore => "ipv4:127.0.0.1:5000";
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
protected override Metadata RequestHeadersCore => requestHeaders;
protected override CancellationToken CancellationTokenCore => cancellationToken;
protected override Metadata ResponseTrailersCore => responseTrailers;
protected override Status StatusCore
{
get => status;
set => status = value;
}
protected override WriteOptions? WriteOptionsCore
{
get => writeOptions;
set => writeOptions = value;
}
protected override AuthContext AuthContextCore { get; } = new(
string.Empty,
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
protected override IDictionary<object, object> UserStateCore => userState;
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
return Task.CompletedTask;
}
protected override ContextPropagationToken CreatePropagationTokenCore(
ContextPropagationOptions? options)
{
throw new NotSupportedException();
}
}
private sealed class TestWorkerProcessFactory(ITestOutputHelper output) : IWorkerProcessFactory
{
private readonly ConcurrentBag<TestWorkerProcess> processes = [];
public IWorkerProcess Start(ProcessStartInfo startInfo)
{
startInfo.RedirectStandardError = true;
startInfo.RedirectStandardOutput = true;
startInfo.UseShellExecute = false;
Process process = new()
{
StartInfo = startInfo,
EnableRaisingEvents = true,
};
process.OutputDataReceived += (_, args) => WriteWorkerOutput("stdout", args.Data);
process.ErrorDataReceived += (_, args) => WriteWorkerOutput("stderr", args.Data);
if (!process.Start())
{
process.Dispose();
throw new InvalidOperationException("Worker process failed to start.");
}
process.BeginOutputReadLine();
process.BeginErrorReadLine();
TestWorkerProcess workerProcess = new(process);
processes.Add(workerProcess);
output.WriteLine($"WorkerProcess started pid={workerProcess.Id} path={startInfo.FileName}");
return workerProcess;
}
public async Task WaitForProcessesAsync(TimeSpan timeout)
{
foreach (TestWorkerProcess process in processes)
{
if (process.HasExited)
{
output.WriteLine($"WorkerProcess exited pid={process.Id} exit_code={process.ExitCode}");
continue;
}
using CancellationTokenSource timeoutCancellation = new(timeout);
await process.WaitForExitAsync(timeoutCancellation.Token).ConfigureAwait(false);
output.WriteLine($"WorkerProcess exited pid={process.Id} exit_code={process.ExitCode}");
}
}
private void WriteWorkerOutput(
string streamName,
string? line)
{
if (!string.IsNullOrWhiteSpace(line))
{
output.WriteLine($"worker_{streamName}: {line}");
}
}
}
private sealed class TestWorkerProcess(Process process) : IWorkerProcess
{
public int Id => process.Id;
public bool HasExited => process.HasExited;
public int? ExitCode => process.HasExited ? process.ExitCode : null;
public async ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
}
public void Kill(bool entireProcessTree)
{
process.Kill(entireProcessTree);
}
public void Dispose()
{
process.Dispose();
}
}
private sealed class TestOutputLoggerProvider(ITestOutputHelper output) : ILoggerProvider
{
public ILogger CreateLogger(string categoryName)
{
return new TestOutputLogger(output, categoryName);
}
public void Dispose()
{
}
}
private sealed class TestOutputLogger(
ITestOutputHelper output,
string categoryName) : ILogger
{
public IDisposable? BeginScope<TState>(TState state)
where TState : notnull
{
return null;
}
public bool IsEnabled(LogLevel logLevel)
{
return logLevel >= LogLevel.Information;
}
public void Log<TState>(
LogLevel logLevel,
EventId eventId,
TState state,
Exception? exception,
Func<TState, Exception?, string> formatter)
{
if (!IsEnabled(logLevel))
{
return;
}
output.WriteLine($"{logLevel} {categoryName}: {formatter(state, exception)}");
if (exception is not null)
{
output.WriteLine(exception.ToString());
}
}
}
}