Merge pull request #79 from agent-3/issue-19-gateway-e2e-smoke-with-fake-worker
Issue #19: gateway end-to-end smoke with fake worker
This commit was merged in pull request #79.
This commit is contained in:
@@ -28,6 +28,12 @@ Session-level tests can connect the harness to the pipe created by
|
||||
`WorkerClient` tests can use `CreateConnectedPairAsync` to create both pipe ends
|
||||
inside the test.
|
||||
|
||||
`GatewayEndToEndFakeWorkerSmokeTests` composes the real gRPC service,
|
||||
`SessionManager`, `SessionWorkerClientFactory`, `WorkerClient`, and
|
||||
`EventStreamService` with a scripted fake worker launcher. The smoke test covers
|
||||
`OpenSession`, `Register`, `AddItem`, `Advise`, one streamed `OnDataChange`
|
||||
event, and `CloseSession` without loading MXAccess COM.
|
||||
|
||||
## Focused Commands
|
||||
|
||||
Run the fake worker tests after changing gateway worker IPC, session startup, or
|
||||
@@ -36,6 +42,7 @@ event streaming behavior:
|
||||
```bash
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~FakeWorkerHarnessTests
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~SessionWorkerClientFactoryFakeWorkerTests
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~GatewayEndToEndFakeWorkerSmokeTests
|
||||
```
|
||||
|
||||
Run the gateway test project after shared gateway test infrastructure changes:
|
||||
|
||||
@@ -0,0 +1,439 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Grpc.Core;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Configuration;
|
||||
using MxGateway.Server.Grpc;
|
||||
using MxGateway.Server.Metrics;
|
||||
using MxGateway.Server.Security.Authorization;
|
||||
using MxGateway.Server.Sessions;
|
||||
using MxGateway.Server.Workers;
|
||||
using MxGateway.Tests.Gateway.Workers.Fakes;
|
||||
|
||||
namespace MxGateway.Tests.Gateway;
|
||||
|
||||
public sealed class GatewayEndToEndFakeWorkerSmokeTests
|
||||
{
|
||||
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
|
||||
private const int ServerHandle = 1001;
|
||||
private const int ItemHandle = 2002;
|
||||
|
||||
[Fact]
|
||||
public async Task GatewayService_WithFakeWorker_CompletesSessionCommandEventAndClosePath()
|
||||
{
|
||||
ScriptedFakeWorkerProcessLauncher launcher = new();
|
||||
await using GatewayServiceFixture fixture = new(launcher);
|
||||
|
||||
OpenSessionReply openReply = await fixture.Service.OpenSession(
|
||||
new OpenSessionRequest
|
||||
{
|
||||
ClientSessionName = "fake-worker-e2e",
|
||||
ClientCorrelationId = "open-correlation",
|
||||
CommandTimeout = Duration.FromTimeSpan(TestTimeout),
|
||||
},
|
||||
new TestServerCallContext());
|
||||
|
||||
RecordingServerStreamWriter<MxEvent> eventWriter = new();
|
||||
Task streamTask = fixture.Service.StreamEvents(
|
||||
new StreamEventsRequest { SessionId = openReply.SessionId },
|
||||
eventWriter,
|
||||
new TestServerCallContext());
|
||||
|
||||
MxCommandReply registerReply = await fixture.Service.Invoke(
|
||||
CreateRegisterRequest(openReply.SessionId),
|
||||
new TestServerCallContext());
|
||||
MxCommandReply addItemReply = await fixture.Service.Invoke(
|
||||
CreateAddItemRequest(openReply.SessionId, registerReply.Register.ServerHandle),
|
||||
new TestServerCallContext());
|
||||
MxCommandReply adviseReply = await fixture.Service.Invoke(
|
||||
CreateAdviseRequest(openReply.SessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle),
|
||||
new TestServerCallContext());
|
||||
|
||||
MxEvent dataChange = await eventWriter.WaitForFirstMessageAsync(TestTimeout);
|
||||
|
||||
CloseSessionReply closeReply = await fixture.Service.CloseSession(
|
||||
new CloseSessionRequest
|
||||
{
|
||||
SessionId = openReply.SessionId,
|
||||
ClientCorrelationId = "close-correlation",
|
||||
},
|
||||
new TestServerCallContext());
|
||||
|
||||
await streamTask.WaitAsync(TestTimeout);
|
||||
await launcher.WorkerTask.WaitAsync(TestTimeout);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
|
||||
Assert.Equal(GatewayContractInfo.DefaultBackendName, openReply.BackendName);
|
||||
Assert.Equal(ScriptedFakeWorkerProcessLauncher.ProcessId, openReply.WorkerProcessId);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
|
||||
Assert.Equal(ServerHandle, registerReply.Register.ServerHandle);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
|
||||
Assert.Equal(ItemHandle, addItemReply.AddItem.ItemHandle);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
|
||||
Assert.Equal(MxEventFamily.OnDataChange, dataChange.Family);
|
||||
Assert.Equal(openReply.SessionId, dataChange.SessionId);
|
||||
Assert.Equal(ServerHandle, dataChange.ServerHandle);
|
||||
Assert.Equal(ItemHandle, dataChange.ItemHandle);
|
||||
Assert.Equal("scripted-value", dataChange.Value.StringValue);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, closeReply.ProtocolStatus.Code);
|
||||
Assert.Equal(SessionState.Closed, closeReply.FinalState);
|
||||
Assert.True(launcher.Process.HasExited);
|
||||
Assert.Equal(
|
||||
[MxCommandKind.Register, MxCommandKind.AddItem, MxCommandKind.Advise],
|
||||
launcher.CommandKinds);
|
||||
}
|
||||
|
||||
private static MxCommandRequest CreateRegisterRequest(string sessionId)
|
||||
{
|
||||
return new MxCommandRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
ClientCorrelationId = "register-correlation",
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Register,
|
||||
Register = new RegisterCommand { ClientName = "fake-worker-e2e-client" },
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private static MxCommandRequest CreateAddItemRequest(
|
||||
string sessionId,
|
||||
int serverHandle)
|
||||
{
|
||||
return new MxCommandRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
ClientCorrelationId = "add-item-correlation",
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AddItem,
|
||||
AddItem = new AddItemCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemDefinition = "Galaxy.Tag.Value",
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private static MxCommandRequest CreateAdviseRequest(
|
||||
string sessionId,
|
||||
int serverHandle,
|
||||
int itemHandle)
|
||||
{
|
||||
return new MxCommandRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
ClientCorrelationId = "advise-correlation",
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Advise,
|
||||
Advise = new AdviseCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemHandle = itemHandle,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private sealed class GatewayServiceFixture : IAsyncDisposable
|
||||
{
|
||||
private readonly GatewayMetrics _metrics = new();
|
||||
private readonly SessionRegistry _registry = new();
|
||||
|
||||
public GatewayServiceFixture(IWorkerProcessLauncher launcher)
|
||||
{
|
||||
IOptions<GatewayOptions> options = Options.Create(CreateOptions());
|
||||
SessionWorkerClientFactory workerClientFactory = new(
|
||||
launcher,
|
||||
options,
|
||||
_metrics,
|
||||
NullLoggerFactory.Instance);
|
||||
SessionManager sessionManager = new(
|
||||
_registry,
|
||||
workerClientFactory,
|
||||
options,
|
||||
_metrics,
|
||||
logger: NullLogger<SessionManager>.Instance);
|
||||
MxAccessGrpcMapper mapper = new();
|
||||
EventStreamService eventStreamService = new(
|
||||
sessionManager,
|
||||
options,
|
||||
mapper,
|
||||
_metrics,
|
||||
NullLogger<EventStreamService>.Instance);
|
||||
|
||||
Service = new MxAccessGatewayService(
|
||||
sessionManager,
|
||||
new GatewayRequestIdentityAccessor(),
|
||||
new MxAccessGrpcRequestValidator(),
|
||||
mapper,
|
||||
eventStreamService,
|
||||
NullLogger<MxAccessGatewayService>.Instance);
|
||||
}
|
||||
|
||||
public MxAccessGatewayService Service { get; }
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
foreach (GatewaySession session in _registry.Snapshot())
|
||||
{
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
|
||||
_metrics.Dispose();
|
||||
}
|
||||
|
||||
private static GatewayOptions CreateOptions()
|
||||
{
|
||||
return new GatewayOptions
|
||||
{
|
||||
Worker = new WorkerOptions
|
||||
{
|
||||
StartupTimeoutSeconds = 5,
|
||||
ShutdownTimeoutSeconds = 5,
|
||||
HeartbeatIntervalSeconds = 30,
|
||||
HeartbeatGraceSeconds = 30,
|
||||
MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
|
||||
},
|
||||
Sessions = new SessionOptions
|
||||
{
|
||||
DefaultCommandTimeoutSeconds = 5,
|
||||
MaxSessions = 4,
|
||||
},
|
||||
Events = new EventOptions
|
||||
{
|
||||
QueueCapacity = 16,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class ScriptedFakeWorkerProcessLauncher : IWorkerProcessLauncher
|
||||
{
|
||||
public const int ProcessId = 4680;
|
||||
private readonly ConcurrentQueue<MxCommandKind> _commandKinds = new();
|
||||
|
||||
public FakeWorkerProcess Process { get; } = new(ProcessId);
|
||||
|
||||
public IReadOnlyCollection<MxCommandKind> CommandKinds => _commandKinds.ToArray();
|
||||
|
||||
public Task WorkerTask { get; private set; } = Task.CompletedTask;
|
||||
|
||||
public Task<WorkerProcessHandle> LaunchAsync(
|
||||
WorkerProcessLaunchRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
WorkerTask = RunWorkerAsync(request, cancellationToken);
|
||||
|
||||
return Task.FromResult(new WorkerProcessHandle(
|
||||
Process,
|
||||
new WorkerProcessCommandLine("fake-worker.exe", []),
|
||||
DateTimeOffset.UtcNow));
|
||||
}
|
||||
|
||||
private async Task RunWorkerAsync(
|
||||
WorkerProcessLaunchRequest request,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync(
|
||||
request.SessionId,
|
||||
request.Nonce,
|
||||
request.PipeName,
|
||||
request.ProtocolVersion,
|
||||
cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
WorkerEnvelope envelope = await harness.ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerCommand)
|
||||
{
|
||||
await ReplyToCommandAsync(harness, envelope, cancellationToken).ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown)
|
||||
{
|
||||
await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
Process.MarkExited(0);
|
||||
return;
|
||||
}
|
||||
|
||||
throw new InvalidOperationException($"Unexpected gateway envelope {envelope.BodyCase}.");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReplyToCommandAsync(
|
||||
FakeWorkerHarness harness,
|
||||
WorkerEnvelope commandEnvelope,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
MxCommand command = commandEnvelope.WorkerCommand.Command;
|
||||
_commandKinds.Enqueue(command.Kind);
|
||||
|
||||
await harness.ReplyToCommandAsync(
|
||||
commandEnvelope,
|
||||
configureReply: reply => ConfigureReply(reply, command.Kind),
|
||||
cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (command.Kind == MxCommandKind.Advise)
|
||||
{
|
||||
await harness.EmitEventAsync(
|
||||
MxEventFamily.OnDataChange,
|
||||
cancellationToken,
|
||||
mxEvent =>
|
||||
{
|
||||
mxEvent.ServerHandle = command.Advise.ServerHandle;
|
||||
mxEvent.ItemHandle = command.Advise.ItemHandle;
|
||||
mxEvent.Quality = 192;
|
||||
mxEvent.Value = new MxValue
|
||||
{
|
||||
DataType = MxDataType.String,
|
||||
StringValue = "scripted-value",
|
||||
};
|
||||
mxEvent.OnDataChange = new OnDataChangeEvent();
|
||||
}).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private static void ConfigureReply(
|
||||
MxCommandReply reply,
|
||||
MxCommandKind kind)
|
||||
{
|
||||
switch (kind)
|
||||
{
|
||||
case MxCommandKind.Register:
|
||||
reply.Register = new RegisterReply { ServerHandle = ServerHandle };
|
||||
break;
|
||||
case MxCommandKind.AddItem:
|
||||
reply.AddItem = new AddItemReply { ItemHandle = ItemHandle };
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
|
||||
{
|
||||
public int Id { get; } = processId;
|
||||
|
||||
public bool HasExited { get; private set; }
|
||||
|
||||
public int? ExitCode { get; private set; }
|
||||
|
||||
public ValueTask WaitForExitAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
HasExited = true;
|
||||
ExitCode ??= 0;
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
public void Kill(bool entireProcessTree)
|
||||
{
|
||||
MarkExited(-1);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
}
|
||||
|
||||
public void MarkExited(int exitCode)
|
||||
{
|
||||
HasExited = true;
|
||||
ExitCode = exitCode;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
|
||||
{
|
||||
private readonly object _syncRoot = new();
|
||||
private readonly TaskCompletionSource<T> _firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private readonly List<T> _messages = [];
|
||||
|
||||
public IReadOnlyList<T> Messages
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
return _messages.ToArray();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public WriteOptions? WriteOptions { get; set; }
|
||||
|
||||
public Task WriteAsync(T message)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
_messages.Add(message);
|
||||
}
|
||||
|
||||
_firstMessage.TrySetResult(message);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout)
|
||||
{
|
||||
return await _firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext
|
||||
{
|
||||
private readonly Metadata _requestHeaders = [];
|
||||
private readonly Metadata _responseTrailers = [];
|
||||
private readonly Dictionary<object, object> _userState = [];
|
||||
private Status _status;
|
||||
private WriteOptions? _writeOptions;
|
||||
|
||||
protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test";
|
||||
|
||||
protected override string HostCore => "localhost";
|
||||
|
||||
protected override string PeerCore => "ipv4:127.0.0.1:5000";
|
||||
|
||||
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
|
||||
|
||||
protected override Metadata RequestHeadersCore => _requestHeaders;
|
||||
|
||||
protected override CancellationToken CancellationTokenCore => cancellationToken;
|
||||
|
||||
protected override Metadata ResponseTrailersCore => _responseTrailers;
|
||||
|
||||
protected override Status StatusCore
|
||||
{
|
||||
get => _status;
|
||||
set => _status = value;
|
||||
}
|
||||
|
||||
protected override WriteOptions? WriteOptionsCore
|
||||
{
|
||||
get => _writeOptions;
|
||||
set => _writeOptions = value;
|
||||
}
|
||||
|
||||
protected override AuthContext AuthContextCore { get; } = new(
|
||||
string.Empty,
|
||||
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
|
||||
|
||||
protected override IDictionary<object, object> UserStateCore => _userState;
|
||||
|
||||
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
protected override ContextPropagationToken CreatePropagationTokenCore(
|
||||
ContextPropagationOptions? options)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -205,6 +205,7 @@ public sealed class FakeWorkerHarness : IAsyncDisposable
|
||||
WorkerEnvelope commandEnvelope,
|
||||
ProtocolStatusCode statusCode = ProtocolStatusCode.Ok,
|
||||
string statusMessage = "OK",
|
||||
Action<MxCommandReply>? configureReply = null,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (commandEnvelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand)
|
||||
@@ -213,22 +214,25 @@ public sealed class FakeWorkerHarness : IAsyncDisposable
|
||||
}
|
||||
|
||||
MxCommandKind kind = commandEnvelope.WorkerCommand.Command?.Kind ?? MxCommandKind.Unspecified;
|
||||
MxCommandReply reply = new()
|
||||
{
|
||||
SessionId = SessionId,
|
||||
CorrelationId = commandEnvelope.CorrelationId,
|
||||
Kind = kind,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = statusCode,
|
||||
Message = statusMessage,
|
||||
},
|
||||
};
|
||||
configureReply?.Invoke(reply);
|
||||
|
||||
await _writer.WriteAsync(
|
||||
CreateEnvelope(
|
||||
commandEnvelope.CorrelationId,
|
||||
envelope => envelope.WorkerCommandReply = new WorkerCommandReply
|
||||
{
|
||||
Reply = new MxCommandReply
|
||||
{
|
||||
SessionId = SessionId,
|
||||
CorrelationId = commandEnvelope.CorrelationId,
|
||||
Kind = kind,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = statusCode,
|
||||
Message = statusMessage,
|
||||
},
|
||||
},
|
||||
Reply = reply,
|
||||
CompletedTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
|
||||
}),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
@@ -236,21 +240,25 @@ public sealed class FakeWorkerHarness : IAsyncDisposable
|
||||
|
||||
public async Task EmitEventAsync(
|
||||
MxEventFamily family,
|
||||
CancellationToken cancellationToken = default)
|
||||
CancellationToken cancellationToken = default,
|
||||
Action<MxEvent>? configureEvent = null)
|
||||
{
|
||||
ulong sequence = NextWorkerSequence + 1;
|
||||
MxEvent mxEvent = new()
|
||||
{
|
||||
SessionId = SessionId,
|
||||
Family = family,
|
||||
WorkerSequence = sequence,
|
||||
WorkerTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
|
||||
};
|
||||
configureEvent?.Invoke(mxEvent);
|
||||
|
||||
await _writer.WriteAsync(
|
||||
CreateEnvelope(
|
||||
correlationId: string.Empty,
|
||||
envelope => envelope.WorkerEvent = new WorkerEvent
|
||||
{
|
||||
Event = new MxEvent
|
||||
{
|
||||
SessionId = SessionId,
|
||||
Family = family,
|
||||
WorkerSequence = sequence,
|
||||
WorkerTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
|
||||
},
|
||||
Event = mxEvent,
|
||||
}),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user