using System.Collections.Concurrent; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using ZB.MOM.WW.MxGateway.Contracts; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Configuration; using ZB.MOM.WW.MxGateway.Server.Grpc; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Security.Authentication; using ZB.MOM.WW.MxGateway.Server.Security.Authorization; using ZB.MOM.WW.MxGateway.Server.Sessions; using ZB.MOM.WW.MxGateway.Server.Workers; using ZB.MOM.WW.MxGateway.Tests.Gateway.Workers.Fakes; using ZB.MOM.WW.MxGateway.Tests.TestSupport; namespace ZB.MOM.WW.MxGateway.Tests.Gateway; public sealed class GatewayEndToEndFakeWorkerSmokeTests { private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); private const int ServerHandle = 1001; private const int ItemHandle = 2002; /// /// Verifies gateway session lifecycle with a scripted fake worker: open, command, event, close. /// [Fact] public async Task GatewayService_WithFakeWorker_CompletesSessionCommandEventAndClosePath() { ScriptedFakeWorkerProcessLauncher launcher = new(); await using GatewayServiceFixture fixture = new(launcher); OpenSessionReply openReply = await fixture.Service.OpenSession( new OpenSessionRequest { ClientSessionName = "fake-worker-e2e", ClientCorrelationId = "open-correlation", CommandTimeout = Duration.FromTimeSpan(TestTimeout), }, new TestServerCallContext()); RecordingServerStreamWriter eventWriter = new(); Task streamTask = fixture.Service.StreamEvents( new StreamEventsRequest { SessionId = openReply.SessionId }, eventWriter, new TestServerCallContext()); MxCommandReply registerReply = await fixture.Service.Invoke( CreateRegisterRequest(openReply.SessionId), new TestServerCallContext()); MxCommandReply addItemReply = await fixture.Service.Invoke( CreateAddItemRequest(openReply.SessionId, registerReply.Register.ServerHandle), new TestServerCallContext()); MxCommandReply adviseReply = await fixture.Service.Invoke( CreateAdviseRequest(openReply.SessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle), new TestServerCallContext()); MxEvent dataChange = await eventWriter.WaitForFirstMessageAsync(TestTimeout); CloseSessionReply closeReply = await fixture.Service.CloseSession( new CloseSessionRequest { SessionId = openReply.SessionId, ClientCorrelationId = "close-correlation", }, new TestServerCallContext()); await streamTask.WaitAsync(TestTimeout); await launcher.WorkerTask.WaitAsync(TestTimeout); Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); Assert.Equal(GatewayContractInfo.DefaultBackendName, openReply.BackendName); Assert.Equal(ScriptedFakeWorkerProcessLauncher.ProcessId, openReply.WorkerProcessId); Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); Assert.Equal(ServerHandle, registerReply.Register.ServerHandle); Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); Assert.Equal(ItemHandle, addItemReply.AddItem.ItemHandle); Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); Assert.Equal(MxEventFamily.OnDataChange, dataChange.Family); Assert.Equal(openReply.SessionId, dataChange.SessionId); Assert.Equal(ServerHandle, dataChange.ServerHandle); Assert.Equal(ItemHandle, dataChange.ItemHandle); Assert.Equal("scripted-value", dataChange.Value.StringValue); Assert.Equal(ProtocolStatusCode.Ok, closeReply.ProtocolStatus.Code); Assert.Equal(SessionState.Closed, closeReply.FinalState); Assert.True(launcher.Process.HasExited); // MarkExited(0) is reached only after the scripted worker observed a WorkerShutdown // envelope and emitted its WorkerShutdownAck — anything else (a kill, a fault) would // have produced a non-zero exit code, so this pins the shutdown-ack handshake. Assert.Equal(0, launcher.Process.ExitCode); Assert.Equal( [MxCommandKind.Register, MxCommandKind.AddItem, MxCommandKind.Advise], launcher.CommandKinds); } private static MxCommandRequest CreateRegisterRequest(string sessionId) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "register-correlation", Command = new MxCommand { Kind = MxCommandKind.Register, Register = new RegisterCommand { ClientName = "fake-worker-e2e-client" }, }, }; } private static MxCommandRequest CreateAddItemRequest( string sessionId, int serverHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "add-item-correlation", Command = new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = serverHandle, ItemDefinition = "Galaxy.Tag.Value", }, }, }; } private static MxCommandRequest CreateAdviseRequest( string sessionId, int serverHandle, int itemHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "advise-correlation", Command = new MxCommand { Kind = MxCommandKind.Advise, Advise = new AdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, }; } private sealed class GatewayServiceFixture : IAsyncDisposable { private readonly GatewayMetrics _metrics = new(); private readonly SessionRegistry _registry = new(); /// /// Initializes a new instance of . /// /// Worker process launcher for the fixture. public GatewayServiceFixture(IWorkerProcessLauncher launcher) { IOptions options = Options.Create(CreateOptions()); SessionWorkerClientFactory workerClientFactory = new( launcher, options, _metrics, NullLoggerFactory.Instance); SessionManager sessionManager = new( _registry, workerClientFactory, options, _metrics, logger: NullLogger.Instance); MxAccessGrpcMapper mapper = new(); EventStreamService eventStreamService = new( sessionManager, options, mapper, _metrics, NullDashboardEventBroadcaster.Instance, NullLogger.Instance); Service = new MxAccessGatewayService( sessionManager, new GatewayRequestIdentityAccessor(), new AllowAllConstraintEnforcer(), new MxAccessGrpcRequestValidator(), mapper, eventStreamService, _metrics, NullLogger.Instance, new FakeGatewayAlarmService()); } /// /// Gets the configured gateway service instance. /// public MxAccessGatewayService Service { get; } /// /// Disposes all active sessions and metrics. /// public async ValueTask DisposeAsync() { foreach (GatewaySession session in _registry.Snapshot()) { await session.DisposeAsync(); } _metrics.Dispose(); } private static GatewayOptions CreateOptions() { return new GatewayOptions { Worker = new WorkerOptions { StartupTimeoutSeconds = 5, ShutdownTimeoutSeconds = 5, HeartbeatIntervalSeconds = 30, HeartbeatGraceSeconds = 30, MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, }, Sessions = new SessionOptions { DefaultCommandTimeoutSeconds = 5, MaxSessions = 4, }, Events = new EventOptions { QueueCapacity = 16, }, }; } } private sealed class ScriptedFakeWorkerProcessLauncher : IWorkerProcessLauncher { public const int ProcessId = 4680; private readonly ConcurrentQueue _commandKinds = new(); /// /// Gets the fake worker process instance. /// public FakeWorkerProcess Process { get; } = new(ProcessId); /// /// Gets the collection of command kinds processed by the worker. /// public IReadOnlyCollection CommandKinds => _commandKinds.ToArray(); /// /// Gets the worker's asynchronous task. /// public Task WorkerTask { get; private set; } = Task.CompletedTask; /// /// Launches a new worker process and returns a handle to manage it. /// /// Worker process launch request parameters. /// Cancellation token. /// Worker process handle. public Task LaunchAsync( WorkerProcessLaunchRequest request, CancellationToken cancellationToken = default) { WorkerTask = RunWorkerAsync(request, cancellationToken); return Task.FromResult(new WorkerProcessHandle( Process, new WorkerProcessCommandLine("fake-worker.exe", []), DateTimeOffset.UtcNow)); } private async Task RunWorkerAsync( WorkerProcessLaunchRequest request, CancellationToken cancellationToken) { await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync( request.SessionId, request.Nonce, request.PipeName, request.ProtocolVersion, cancellationToken: cancellationToken).ConfigureAwait(false); await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false); while (!cancellationToken.IsCancellationRequested) { WorkerEnvelope envelope = await harness.ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerCommand) { await ReplyToCommandAsync(harness, envelope, cancellationToken).ConfigureAwait(false); continue; } if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown) { await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); Process.MarkExited(0); return; } throw new InvalidOperationException($"Unexpected gateway envelope {envelope.BodyCase}."); } } private async Task ReplyToCommandAsync( FakeWorkerHarness harness, WorkerEnvelope commandEnvelope, CancellationToken cancellationToken) { MxCommand command = commandEnvelope.WorkerCommand.Command; _commandKinds.Enqueue(command.Kind); await harness.ReplyToCommandAsync( commandEnvelope, configureReply: reply => ConfigureReply(reply, command.Kind), cancellationToken: cancellationToken).ConfigureAwait(false); if (command.Kind == MxCommandKind.Advise) { await harness.EmitEventAsync( MxEventFamily.OnDataChange, cancellationToken, mxEvent => { mxEvent.ServerHandle = command.Advise.ServerHandle; mxEvent.ItemHandle = command.Advise.ItemHandle; mxEvent.Quality = 192; mxEvent.Value = new MxValue { DataType = MxDataType.String, StringValue = "scripted-value", }; mxEvent.OnDataChange = new OnDataChangeEvent(); }).ConfigureAwait(false); } } private static void ConfigureReply( MxCommandReply reply, MxCommandKind kind) { switch (kind) { case MxCommandKind.Register: reply.Register = new RegisterReply { ServerHandle = ServerHandle }; break; case MxCommandKind.AddItem: reply.AddItem = new AddItemReply { ItemHandle = ItemHandle }; break; } } } private sealed class FakeWorkerProcess(int processId) : IWorkerProcess { private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously); /// /// Gets the process identifier. /// public int Id { get; } = processId; /// /// Gets a value indicating whether the process has exited. /// public bool HasExited { get; private set; } /// /// Gets the exit code of the process. /// public int? ExitCode { get; private set; } /// /// Waits for the process to exit asynchronously. Completes only when /// or has been called, so callers that observe completion can /// trust that exit actually happened (e.g., via the worker shutdown-ack path). /// /// Cancellation token. /// A task that completes when the process has actually exited. public ValueTask WaitForExitAsync(CancellationToken cancellationToken) { return new ValueTask(_exited.Task.WaitAsync(cancellationToken)); } /// /// Terminates the process. /// /// Whether to kill the entire process tree. public void Kill(bool entireProcessTree) { MarkExited(-1); } /// /// Releases resources used by this process. /// public void Dispose() { } /// /// Marks the process as exited with the specified exit code. /// /// The process exit code. public void MarkExited(int exitCode) { HasExited = true; ExitCode = exitCode; _exited.TrySetResult(); } } }