From bf72cd89615d03810a2a756a02ec358f88f81f41 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 10:20:51 -0400 Subject: [PATCH] feat(worker): implement Ping/GetSessionState/GetWorkerInfo/DrainEvents/ShutdownWorker control commands Answer the five worker control/lifecycle commands at the WorkerPipeSession message-loop layer instead of the STA-bound MxAccessCommandExecutor. These replies are built from process-level state (worker pid, assembly version, worker lifecycle, the runtime session's event queue) the executor cannot see, and ShutdownWorker must emit its OK reply before the graceful shutdown joins the STA thread - dispatching it onto the STA would deadlock. - Ping: OK reply, echoes message into diagnostic_message. - GetSessionState: maps WorkerState to proto SessionState. - GetWorkerInfo: pid, worker version, MXAccess ProgID/CLSID. - DrainEvents: drains the runtime event queue into DrainEventsReply. - ShutdownWorker: OK reply, then graceful shutdown, then stops the loop. Tests added in WorkerPipeSessionTests; FakeRuntimeSession gains a batch-size drain suppressor so DrainEvents does not race the background drain loop. --- .../Ipc/WorkerPipeSessionTests.cs | 240 ++++++++++++++++++ .../TestSupport/FakeRuntimeSession.cs | 15 ++ .../Ipc/WorkerPipeSession.cs | 185 ++++++++++++++ 3 files changed, 440 insertions(+) diff --git a/src/ZB.MOM.WW.MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs b/src/ZB.MOM.WW.MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs index 1300af4..37a3e78 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs @@ -288,6 +288,212 @@ public sealed class WorkerPipeSessionTests await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); } + /// + /// Verifies that a Ping control command is answered on the worker side + /// (not dispatched to the STA) with an OK reply that echoes the ping + /// message into the reply's diagnostic field. + /// + [Fact] + public async Task RunAsync_PingControlCommand_RepliesOkAndEchoesMessage() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + WorkerPipeSession session = CreatePipeSession(pipePair.WorkerStream, runtime); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter + .WriteAsync(CreatePingCommandEnvelope("ping-1", "hello-worker"), cancellation.Token) + .ConfigureAwait(false); + + WorkerEnvelope replyEnvelope = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerCommandReply, + cancellation.Token); + + MxCommandReply reply = replyEnvelope.WorkerCommandReply.Reply; + Assert.Equal("ping-1", reply.CorrelationId); + Assert.Equal(MxCommandKind.Ping, reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + Assert.Equal("hello-worker", reply.DiagnosticMessage); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + /// + /// Verifies that GetSessionState reports the worker's lifecycle as the + /// proto SessionState — READY while the message loop is serving. + /// + [Fact] + public async Task RunAsync_GetSessionStateControlCommand_RepliesReady() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + WorkerPipeSession session = CreatePipeSession(pipePair.WorkerStream, runtime); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter + .WriteAsync( + CreateControlCommandEnvelope( + "state-1", + MxCommandKind.GetSessionState, + command => command.GetSessionState = new GetSessionStateCommand()), + cancellation.Token) + .ConfigureAwait(false); + + WorkerEnvelope replyEnvelope = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerCommandReply, + cancellation.Token); + + MxCommandReply reply = replyEnvelope.WorkerCommandReply.Reply; + Assert.Equal(MxCommandKind.GetSessionState, reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + Assert.Equal(SessionState.Ready, reply.SessionState.State); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + /// + /// Verifies that GetWorkerInfo populates the worker process id, version, + /// and MXAccess ProgID/CLSID from the worker's own metadata. + /// + [Fact] + public async Task RunAsync_GetWorkerInfoControlCommand_PopulatesWorkerInfoFields() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + WorkerPipeSession session = CreatePipeSession(pipePair.WorkerStream, runtime); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter + .WriteAsync( + CreateControlCommandEnvelope( + "info-1", + MxCommandKind.GetWorkerInfo, + command => command.GetWorkerInfo = new GetWorkerInfoCommand()), + cancellation.Token) + .ConfigureAwait(false); + + WorkerEnvelope replyEnvelope = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerCommandReply, + cancellation.Token); + + MxCommandReply reply = replyEnvelope.WorkerCommandReply.Reply; + Assert.Equal(MxCommandKind.GetWorkerInfo, reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + WorkerInfoReply info = reply.WorkerInfo; + Assert.Equal(1234, info.WorkerProcessId); + Assert.False(string.IsNullOrEmpty(info.WorkerVersion)); + Assert.Equal(MxAccessInteropInfo.ProgId, info.MxaccessProgid); + Assert.Equal(MxAccessInteropInfo.Clsid, info.MxaccessClsid); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + /// + /// Verifies that DrainEvents drains the runtime session's queued events + /// into the reply rather than streaming them as WorkerEvent envelopes. + /// + [Fact] + public async Task RunAsync_DrainEventsControlCommand_ReturnsQueuedEvents() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + // Suppress the background drain loop's fixed-batch drains so the + // queued events survive for the explicit DrainEvents command (which + // drains all via max_events == 0). 128 mirrors + // WorkerPipeSession.EventDrainBatchSize. + FakeRuntimeSession runtime = new() { SuppressDrainForBatchSize = 128 }; + WorkerPipeSession session = CreatePipeSession(pipePair.WorkerStream, runtime); + runtime.EnqueueEvent(CreateWorkerEvent(sequence: 11)); + runtime.EnqueueEvent(CreateWorkerEvent(sequence: 12)); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter + .WriteAsync( + CreateControlCommandEnvelope( + "drain-1", + MxCommandKind.DrainEvents, + command => command.DrainEvents = new DrainEventsCommand { MaxEvents = 0 }), + cancellation.Token) + .ConfigureAwait(false); + + WorkerEnvelope replyEnvelope = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerCommandReply, + cancellation.Token); + + MxCommandReply reply = replyEnvelope.WorkerCommandReply.Reply; + Assert.Equal(MxCommandKind.DrainEvents, reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + Assert.Equal(2, reply.DrainEvents.Events.Count); + Assert.Contains(reply.DrainEvents.Events, e => e.WorkerSequence == 11UL); + Assert.Contains(reply.DrainEvents.Events, e => e.WorkerSequence == 12UL); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + /// + /// Verifies that ShutdownWorker returns its OK reply BEFORE the graceful + /// shutdown runs and disposes the runtime session, and that the message + /// loop then stops. + /// + [Fact] + public async Task RunAsync_ShutdownWorkerControlCommand_RepliesOkThenShutsDown() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + WorkerPipeSession session = CreatePipeSession(pipePair.WorkerStream, runtime); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter + .WriteAsync( + CreateControlCommandEnvelope( + "shutdown-1", + MxCommandKind.ShutdownWorker, + command => command.ShutdownWorker = new ShutdownWorkerCommand + { + GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)), + }), + cancellation.Token) + .ConfigureAwait(false); + + WorkerEnvelope replyEnvelope = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerCommandReply, + cancellation.Token); + + MxCommandReply reply = replyEnvelope.WorkerCommandReply.Reply; + Assert.Equal("shutdown-1", reply.CorrelationId); + Assert.Equal(MxCommandKind.ShutdownWorker, reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + + // The OK reply is followed by a shutdown ack, then the loop stops and + // the runtime session is disposed. + WorkerEnvelope ack = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, + cancellation.Token); + Assert.Equal(ProtocolStatusCode.Ok, ack.WorkerShutdownAck.Status.Code); + + Task completedTask = await Task + .WhenAny(runTask, Task.Delay(TimeSpan.FromSeconds(5), cancellation.Token)) + .ConfigureAwait(false); + Assert.Same(runTask, completedTask); + await runTask.ConfigureAwait(false); + Assert.True(runtime.Disposed, "ShutdownWorker must dispose the runtime session."); + } + /// /// Verifies that stale STA activity with no command in flight triggers @@ -939,6 +1145,40 @@ public sealed class WorkerPipeSessionTests }; } + private static WorkerEnvelope CreatePingCommandEnvelope( + string correlationId, + string message, + ulong sequence = 2) + { + return CreateControlCommandEnvelope( + correlationId, + MxCommandKind.Ping, + command => command.Ping = new PingCommand { Message = message }, + sequence); + } + + private static WorkerEnvelope CreateControlCommandEnvelope( + string correlationId, + MxCommandKind kind, + Action configurePayload, + ulong sequence = 2) + { + MxCommand command = new() { Kind = kind }; + configurePayload(command); + return new WorkerEnvelope + { + ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + SessionId = SessionId, + Sequence = sequence, + CorrelationId = correlationId, + WorkerCommand = new WorkerCommand + { + Command = command, + EnqueueTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }, + }; + } + private static WorkerEnvelope CreateCancelEnvelope(string correlationId, ulong sequence = 2) { return new WorkerEnvelope diff --git a/src/ZB.MOM.WW.MxGateway.Worker.Tests/TestSupport/FakeRuntimeSession.cs b/src/ZB.MOM.WW.MxGateway.Worker.Tests/TestSupport/FakeRuntimeSession.cs index e361819..7bd6ea7 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker.Tests/TestSupport/FakeRuntimeSession.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker.Tests/TestSupport/FakeRuntimeSession.cs @@ -122,11 +122,26 @@ internal sealed class FakeRuntimeSession : IWorkerRuntimeSession } } + /// + /// When set, returns no events for the + /// WorkerPipeSession background drain loop's fixed batch size, so an + /// explicit DrainEvents control command (which drains all via + /// maxEvents == 0) can claim the queued events deterministically + /// without racing the 25 ms background loop. Mirrors + /// WorkerPipeSession.EventDrainBatchSize. + /// + public uint? SuppressDrainForBatchSize { get; set; } + /// Drains queued events up to the specified limit. /// Maximum events to drain; 0 drains all. /// The drained events. public IReadOnlyList DrainEvents(uint maxEvents) { + if (SuppressDrainForBatchSize is uint suppressed && maxEvents == suppressed) + { + return Array.Empty(); + } + lock (gate) { int drainCount = maxEvents == 0 diff --git a/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs b/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs index 238b77a..de4f199 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeSession.cs @@ -378,6 +378,22 @@ public sealed class WorkerPipeSession switch (envelope.BodyCase) { case WorkerEnvelope.BodyOneofCase.WorkerCommand: + // Worker control/lifecycle commands (Ping, GetSessionState, + // GetWorkerInfo, DrainEvents, ShutdownWorker) are answered here + // on the message-loop thread instead of being dispatched onto + // the STA. Their replies are built from process-level state + // (worker process id, assembly version, _state, the runtime + // session's event queue) that the STA-bound + // MxAccessCommandExecutor cannot see, and ShutdownWorker must + // return its OK reply BEFORE the graceful shutdown joins the + // STA thread — running it on the STA would deadlock. Returning + // false from the ShutdownWorker arm stops the read loop exactly + // as a WorkerShutdown envelope would. + if (IsControlCommand(envelope.WorkerCommand?.Command?.Kind ?? MxCommandKind.Unspecified)) + { + return await HandleControlCommandAsync(envelope, cancellationToken).ConfigureAwait(false); + } + TryStartCommandTask(envelope, cancellationToken); return true; case WorkerEnvelope.BodyOneofCase.WorkerShutdown: @@ -393,6 +409,175 @@ public sealed class WorkerPipeSession } } + private static bool IsControlCommand(MxCommandKind kind) + { + return kind switch + { + MxCommandKind.Ping => true, + MxCommandKind.GetSessionState => true, + MxCommandKind.GetWorkerInfo => true, + MxCommandKind.DrainEvents => true, + MxCommandKind.ShutdownWorker => true, + _ => false, + }; + } + + /// + /// Answers a worker control/lifecycle command on the message-loop + /// thread (never on the STA). Returns false only for + /// — after writing its OK + /// reply this drives the same graceful-shutdown path a + /// WorkerShutdown envelope would, then signals the read loop to + /// stop. All other control commands return true to keep reading. + /// + private async Task HandleControlCommandAsync( + WorkerEnvelope envelope, + CancellationToken cancellationToken) + { + WorkerCommand workerCommand = envelope.WorkerCommand; + MxCommand command = workerCommand.Command; + string correlationId = envelope.CorrelationId; + + if (command.Kind == MxCommandKind.ShutdownWorker) + { + // Build and emit the OK reply BEFORE triggering shutdown so the + // gateway's correlation-id wait is satisfied even though the + // graceful shutdown below tears the session (and pipe) down. + MxCommandReply shutdownReply = CreateControlOkReply(correlationId, command.Kind); + await WriteControlReplyAsync(shutdownReply, cancellationToken).ConfigureAwait(false); + + WorkerShutdown shutdown = new(); + if (command.ShutdownWorker?.GracePeriod is not null) + { + shutdown.GracePeriod = command.ShutdownWorker.GracePeriod; + } + + shutdown.Reason = "ShutdownWorker command"; + await ShutdownAsync(shutdown, cancellationToken).ConfigureAwait(false); + return false; + } + + MxCommandReply reply = command.Kind switch + { + MxCommandKind.Ping => CreatePingReply(correlationId, command), + MxCommandKind.GetSessionState => CreateSessionStateReply(correlationId, command.Kind), + MxCommandKind.GetWorkerInfo => CreateWorkerInfoReply(correlationId, command.Kind), + MxCommandKind.DrainEvents => CreateDrainEventsReply(correlationId, command), + _ => CreateControlOkReply(correlationId, command.Kind), + }; + + await WriteControlReplyAsync(reply, cancellationToken).ConfigureAwait(false); + return true; + } + + private Task WriteControlReplyAsync( + MxCommandReply reply, + CancellationToken cancellationToken) + { + return _writer.WriteAsync( + CreateEnvelope(new WorkerCommandReply + { + Reply = reply, + CompletedTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), + }), + cancellationToken); + } + + private MxCommandReply CreatePingReply(string correlationId, MxCommand command) + { + MxCommandReply reply = CreateControlOkReply(correlationId, command.Kind); + + // Echo the ping message back through the base reply's diagnostic + // message field (there is no dedicated PingReply payload). An empty + // message leaves the diagnostic field at its proto3 default. + string? message = command.Ping?.Message; + if (!string.IsNullOrEmpty(message)) + { + reply.DiagnosticMessage = message; + } + + return reply; + } + + private MxCommandReply CreateSessionStateReply(string correlationId, MxCommandKind kind) + { + MxCommandReply reply = CreateControlOkReply(correlationId, kind); + reply.SessionState = new SessionStateReply + { + State = MapWorkerStateToSessionState(_state), + }; + return reply; + } + + private MxCommandReply CreateWorkerInfoReply(string correlationId, MxCommandKind kind) + { + MxCommandReply reply = CreateControlOkReply(correlationId, kind); + reply.WorkerInfo = new WorkerInfoReply + { + WorkerProcessId = _processIdProvider(), + WorkerVersion = typeof(WorkerPipeSession).Assembly.GetName().Version?.ToString() ?? string.Empty, + MxaccessProgid = MxAccessInteropInfo.ProgId, + MxaccessClsid = MxAccessInteropInfo.Clsid, + }; + return reply; + } + + private MxCommandReply CreateDrainEventsReply(string correlationId, MxCommand command) + { + MxCommandReply reply = CreateControlOkReply(correlationId, command.Kind); + DrainEventsReply drainReply = new(); + + IWorkerRuntimeSession? runtimeSession = _runtimeSession; + if (runtimeSession is not null) + { + uint maxEvents = command.DrainEvents?.MaxEvents ?? 0; + foreach (WorkerEvent workerEvent in runtimeSession.DrainEvents(maxEvents)) + { + if (workerEvent.Event is not null) + { + drainReply.Events.Add(workerEvent.Event); + } + } + } + + reply.DrainEvents = drainReply; + return reply; + } + + private MxCommandReply CreateControlOkReply(string correlationId, MxCommandKind kind) + { + return new MxCommandReply + { + SessionId = _options.SessionId, + CorrelationId = correlationId, + Kind = kind, + Hresult = 0, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = "OK", + }, + }; + } + + private static SessionState MapWorkerStateToSessionState(WorkerState state) + { + return state switch + { + WorkerState.Starting => SessionState.StartingWorker, + WorkerState.Handshaking => SessionState.Handshaking, + WorkerState.InitializingSta => SessionState.InitializingWorker, + WorkerState.Ready => SessionState.Ready, + // A control command is being served, so the STA is alive and + // ready — the busy state is incidental, not a distinct lifecycle. + WorkerState.ExecutingCommand => SessionState.Ready, + WorkerState.ShuttingDown => SessionState.Closing, + WorkerState.Stopped => SessionState.Closed, + WorkerState.Faulted => SessionState.Faulted, + _ => SessionState.Unspecified, + }; + } + private async Task ProcessCommandAsync( WorkerEnvelope envelope, CancellationToken cancellationToken)