diff --git a/src/ZB.MOM.WW.MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs b/src/ZB.MOM.WW.MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs index 1300af4..37a3e78 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs @@ -288,6 +288,212 @@ public sealed class WorkerPipeSessionTests await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); } + /// + /// Verifies that a Ping control command is answered on the worker side + /// (not dispatched to the STA) with an OK reply that echoes the ping + /// message into the reply's diagnostic field. + /// + [Fact] + public async Task RunAsync_PingControlCommand_RepliesOkAndEchoesMessage() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + WorkerPipeSession session = CreatePipeSession(pipePair.WorkerStream, runtime); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter + .WriteAsync(CreatePingCommandEnvelope("ping-1", "hello-worker"), cancellation.Token) + .ConfigureAwait(false); + + WorkerEnvelope replyEnvelope = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerCommandReply, + cancellation.Token); + + MxCommandReply reply = replyEnvelope.WorkerCommandReply.Reply; + Assert.Equal("ping-1", reply.CorrelationId); + Assert.Equal(MxCommandKind.Ping, reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + Assert.Equal("hello-worker", reply.DiagnosticMessage); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + /// + /// Verifies that GetSessionState reports the worker's lifecycle as the + /// proto SessionState — READY while the message loop is serving. + /// + [Fact] + public async Task RunAsync_GetSessionStateControlCommand_RepliesReady() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + WorkerPipeSession session = CreatePipeSession(pipePair.WorkerStream, runtime); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter + .WriteAsync( + CreateControlCommandEnvelope( + "state-1", + MxCommandKind.GetSessionState, + command => command.GetSessionState = new GetSessionStateCommand()), + cancellation.Token) + .ConfigureAwait(false); + + WorkerEnvelope replyEnvelope = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerCommandReply, + cancellation.Token); + + MxCommandReply reply = replyEnvelope.WorkerCommandReply.Reply; + Assert.Equal(MxCommandKind.GetSessionState, reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + Assert.Equal(SessionState.Ready, reply.SessionState.State); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + /// + /// Verifies that GetWorkerInfo populates the worker process id, version, + /// and MXAccess ProgID/CLSID from the worker's own metadata. + /// + [Fact] + public async Task RunAsync_GetWorkerInfoControlCommand_PopulatesWorkerInfoFields() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + WorkerPipeSession session = CreatePipeSession(pipePair.WorkerStream, runtime); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter + .WriteAsync( + CreateControlCommandEnvelope( + "info-1", + MxCommandKind.GetWorkerInfo, + command => command.GetWorkerInfo = new GetWorkerInfoCommand()), + cancellation.Token) + .ConfigureAwait(false); + + WorkerEnvelope replyEnvelope = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerCommandReply, + cancellation.Token); + + MxCommandReply reply = replyEnvelope.WorkerCommandReply.Reply; + Assert.Equal(MxCommandKind.GetWorkerInfo, reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + WorkerInfoReply info = reply.WorkerInfo; + Assert.Equal(1234, info.WorkerProcessId); + Assert.False(string.IsNullOrEmpty(info.WorkerVersion)); + Assert.Equal(MxAccessInteropInfo.ProgId, info.MxaccessProgid); + Assert.Equal(MxAccessInteropInfo.Clsid, info.MxaccessClsid); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + /// + /// Verifies that DrainEvents drains the runtime session's queued events + /// into the reply rather than streaming them as WorkerEvent envelopes. + /// + [Fact] + public async Task RunAsync_DrainEventsControlCommand_ReturnsQueuedEvents() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + // Suppress the background drain loop's fixed-batch drains so the + // queued events survive for the explicit DrainEvents command (which + // drains all via max_events == 0). 128 mirrors + // WorkerPipeSession.EventDrainBatchSize. + FakeRuntimeSession runtime = new() { SuppressDrainForBatchSize = 128 }; + WorkerPipeSession session = CreatePipeSession(pipePair.WorkerStream, runtime); + runtime.EnqueueEvent(CreateWorkerEvent(sequence: 11)); + runtime.EnqueueEvent(CreateWorkerEvent(sequence: 12)); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter + .WriteAsync( + CreateControlCommandEnvelope( + "drain-1", + MxCommandKind.DrainEvents, + command => command.DrainEvents = new DrainEventsCommand { MaxEvents = 0 }), + cancellation.Token) + .ConfigureAwait(false); + + WorkerEnvelope replyEnvelope = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerCommandReply, + cancellation.Token); + + MxCommandReply reply = replyEnvelope.WorkerCommandReply.Reply; + Assert.Equal(MxCommandKind.DrainEvents, reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + Assert.Equal(2, reply.DrainEvents.Events.Count); + Assert.Contains(reply.DrainEvents.Events, e => e.WorkerSequence == 11UL); + Assert.Contains(reply.DrainEvents.Events, e => e.WorkerSequence == 12UL); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + /// + /// Verifies that ShutdownWorker returns its OK reply BEFORE the graceful + /// shutdown runs and disposes the runtime session, and that the message + /// loop then stops. + /// + [Fact] + public async Task RunAsync_ShutdownWorkerControlCommand_RepliesOkThenShutsDown() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + WorkerPipeSession session = CreatePipeSession(pipePair.WorkerStream, runtime); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter + .WriteAsync( + CreateControlCommandEnvelope( + "shutdown-1", + MxCommandKind.ShutdownWorker, + command => command.ShutdownWorker = new ShutdownWorkerCommand + { + GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)), + }), + cancellation.Token) + .ConfigureAwait(false); + + WorkerEnvelope replyEnvelope = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerCommandReply, + cancellation.Token); + + MxCommandReply reply = replyEnvelope.WorkerCommandReply.Reply; + Assert.Equal("shutdown-1", reply.CorrelationId); + Assert.Equal(MxCommandKind.ShutdownWorker, reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + + // The OK reply is followed by a shutdown ack, then the loop stops and + // the runtime session is disposed. + WorkerEnvelope ack = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, + cancellation.Token); + Assert.Equal(ProtocolStatusCode.Ok, ack.WorkerShutdownAck.Status.Code); + + Task completedTask = await Task + .WhenAny(runTask, Task.Delay(TimeSpan.FromSeconds(5), cancellation.Token)) + .ConfigureAwait(false); + Assert.Same(runTask, completedTask); + await runTask.ConfigureAwait(false); + Assert.True(runtime.Disposed, "ShutdownWorker must dispose the runtime session."); + } + /// /// Verifies that stale STA activity with no command in flight triggers @@ -939,6 +1145,40 @@ public sealed class WorkerPipeSessionTests }; } + private static WorkerEnvelope CreatePingCommandEnvelope( + string correlationId, + string message, + ulong sequence = 2) + { + return CreateControlCommandEnvelope( + correlationId, + MxCommandKind.Ping, + command => command.Ping = new PingCommand { Message = message }, + sequence); + } + + private static WorkerEnvelope CreateControlCommandEnvelope( + string correlationId, + MxCommandKind kind, + Action configurePayload, + ulong sequence = 2) + { + MxCommand command = new() { Kind = kind }; + configurePayload(command); + return new WorkerEnvelope + { + ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + SessionId = SessionId, + Sequence = sequence, + CorrelationId = correlationId, + WorkerCommand = new WorkerCommand + { + Command = command, + EnqueueTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }, + }; + } + private static WorkerEnvelope CreateCancelEnvelope(string correlationId, ulong sequence = 2) { return new WorkerEnvelope diff --git a/src/ZB.MOM.WW.MxGateway.Worker.Tests/TestSupport/FakeRuntimeSession.cs b/src/ZB.MOM.WW.MxGateway.Worker.Tests/TestSupport/FakeRuntimeSession.cs index e361819..7bd6ea7 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker.Tests/TestSupport/FakeRuntimeSession.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker.Tests/TestSupport/FakeRuntimeSession.cs @@ -122,11 +122,26 @@ internal sealed class FakeRuntimeSession : IWorkerRuntimeSession } } + /// + /// When set, returns no events for the + /// WorkerPipeSession background drain loop's fixed batch size, so an + /// explicit DrainEvents control command (which drains all via + /// maxEvents == 0) can claim the queued events deterministically + /// without racing the 25 ms background loop. Mirrors + /// WorkerPipeSession.EventDrainBatchSize. + /// + public uint? SuppressDrainForBatchSize { get; set; } + /// Drains queued events up to the specified limit. /// Maximum events to drain; 0 drains all. /// The drained events. public IReadOnlyList DrainEvents(uint maxEvents) { + if (SuppressDrainForBatchSize is uint suppressed && maxEvents == suppressed) + { + return Array.Empty(); + } + lock (gate) { int drainCount = maxEvents == 0 diff --git a/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs b/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs index 238b77a..de4f199 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs @@ -378,6 +378,22 @@ public sealed class WorkerPipeSession switch (envelope.BodyCase) { case WorkerEnvelope.BodyOneofCase.WorkerCommand: + // Worker control/lifecycle commands (Ping, GetSessionState, + // GetWorkerInfo, DrainEvents, ShutdownWorker) are answered here + // on the message-loop thread instead of being dispatched onto + // the STA. Their replies are built from process-level state + // (worker process id, assembly version, _state, the runtime + // session's event queue) that the STA-bound + // MxAccessCommandExecutor cannot see, and ShutdownWorker must + // return its OK reply BEFORE the graceful shutdown joins the + // STA thread — running it on the STA would deadlock. Returning + // false from the ShutdownWorker arm stops the read loop exactly + // as a WorkerShutdown envelope would. + if (IsControlCommand(envelope.WorkerCommand?.Command?.Kind ?? MxCommandKind.Unspecified)) + { + return await HandleControlCommandAsync(envelope, cancellationToken).ConfigureAwait(false); + } + TryStartCommandTask(envelope, cancellationToken); return true; case WorkerEnvelope.BodyOneofCase.WorkerShutdown: @@ -393,6 +409,175 @@ public sealed class WorkerPipeSession } } + private static bool IsControlCommand(MxCommandKind kind) + { + return kind switch + { + MxCommandKind.Ping => true, + MxCommandKind.GetSessionState => true, + MxCommandKind.GetWorkerInfo => true, + MxCommandKind.DrainEvents => true, + MxCommandKind.ShutdownWorker => true, + _ => false, + }; + } + + /// + /// Answers a worker control/lifecycle command on the message-loop + /// thread (never on the STA). Returns false only for + /// — after writing its OK + /// reply this drives the same graceful-shutdown path a + /// WorkerShutdown envelope would, then signals the read loop to + /// stop. All other control commands return true to keep reading. + /// + private async Task HandleControlCommandAsync( + WorkerEnvelope envelope, + CancellationToken cancellationToken) + { + WorkerCommand workerCommand = envelope.WorkerCommand; + MxCommand command = workerCommand.Command; + string correlationId = envelope.CorrelationId; + + if (command.Kind == MxCommandKind.ShutdownWorker) + { + // Build and emit the OK reply BEFORE triggering shutdown so the + // gateway's correlation-id wait is satisfied even though the + // graceful shutdown below tears the session (and pipe) down. + MxCommandReply shutdownReply = CreateControlOkReply(correlationId, command.Kind); + await WriteControlReplyAsync(shutdownReply, cancellationToken).ConfigureAwait(false); + + WorkerShutdown shutdown = new(); + if (command.ShutdownWorker?.GracePeriod is not null) + { + shutdown.GracePeriod = command.ShutdownWorker.GracePeriod; + } + + shutdown.Reason = "ShutdownWorker command"; + await ShutdownAsync(shutdown, cancellationToken).ConfigureAwait(false); + return false; + } + + MxCommandReply reply = command.Kind switch + { + MxCommandKind.Ping => CreatePingReply(correlationId, command), + MxCommandKind.GetSessionState => CreateSessionStateReply(correlationId, command.Kind), + MxCommandKind.GetWorkerInfo => CreateWorkerInfoReply(correlationId, command.Kind), + MxCommandKind.DrainEvents => CreateDrainEventsReply(correlationId, command), + _ => CreateControlOkReply(correlationId, command.Kind), + }; + + await WriteControlReplyAsync(reply, cancellationToken).ConfigureAwait(false); + return true; + } + + private Task WriteControlReplyAsync( + MxCommandReply reply, + CancellationToken cancellationToken) + { + return _writer.WriteAsync( + CreateEnvelope(new WorkerCommandReply + { + Reply = reply, + CompletedTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), + }), + cancellationToken); + } + + private MxCommandReply CreatePingReply(string correlationId, MxCommand command) + { + MxCommandReply reply = CreateControlOkReply(correlationId, command.Kind); + + // Echo the ping message back through the base reply's diagnostic + // message field (there is no dedicated PingReply payload). An empty + // message leaves the diagnostic field at its proto3 default. + string? message = command.Ping?.Message; + if (!string.IsNullOrEmpty(message)) + { + reply.DiagnosticMessage = message; + } + + return reply; + } + + private MxCommandReply CreateSessionStateReply(string correlationId, MxCommandKind kind) + { + MxCommandReply reply = CreateControlOkReply(correlationId, kind); + reply.SessionState = new SessionStateReply + { + State = MapWorkerStateToSessionState(_state), + }; + return reply; + } + + private MxCommandReply CreateWorkerInfoReply(string correlationId, MxCommandKind kind) + { + MxCommandReply reply = CreateControlOkReply(correlationId, kind); + reply.WorkerInfo = new WorkerInfoReply + { + WorkerProcessId = _processIdProvider(), + WorkerVersion = typeof(WorkerPipeSession).Assembly.GetName().Version?.ToString() ?? string.Empty, + MxaccessProgid = MxAccessInteropInfo.ProgId, + MxaccessClsid = MxAccessInteropInfo.Clsid, + }; + return reply; + } + + private MxCommandReply CreateDrainEventsReply(string correlationId, MxCommand command) + { + MxCommandReply reply = CreateControlOkReply(correlationId, command.Kind); + DrainEventsReply drainReply = new(); + + IWorkerRuntimeSession? runtimeSession = _runtimeSession; + if (runtimeSession is not null) + { + uint maxEvents = command.DrainEvents?.MaxEvents ?? 0; + foreach (WorkerEvent workerEvent in runtimeSession.DrainEvents(maxEvents)) + { + if (workerEvent.Event is not null) + { + drainReply.Events.Add(workerEvent.Event); + } + } + } + + reply.DrainEvents = drainReply; + return reply; + } + + private MxCommandReply CreateControlOkReply(string correlationId, MxCommandKind kind) + { + return new MxCommandReply + { + SessionId = _options.SessionId, + CorrelationId = correlationId, + Kind = kind, + Hresult = 0, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = "OK", + }, + }; + } + + private static SessionState MapWorkerStateToSessionState(WorkerState state) + { + return state switch + { + WorkerState.Starting => SessionState.StartingWorker, + WorkerState.Handshaking => SessionState.Handshaking, + WorkerState.InitializingSta => SessionState.InitializingWorker, + WorkerState.Ready => SessionState.Ready, + // A control command is being served, so the STA is alive and + // ready — the busy state is incidental, not a distinct lifecycle. + WorkerState.ExecutingCommand => SessionState.Ready, + WorkerState.ShuttingDown => SessionState.Closing, + WorkerState.Stopped => SessionState.Closed, + WorkerState.Faulted => SessionState.Faulted, + _ => SessionState.Unspecified, + }; + } + private async Task ProcessCommandAsync( WorkerEnvelope envelope, CancellationToken cancellationToken)