From 4a3560c7ee679b8db1246ac77e69ebc6b7f8a7c7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 19:12:06 -0400 Subject: [PATCH] Implement worker heartbeat watchdog --- docs/GatewayTesting.md | 3 + docs/mxaccess-worker-instance-design.md | 28 +- .../Gateway/Workers/FakeWorkerHarnessTests.cs | 19 + .../Workers/Fakes/FakeWorkerHarness.cs | 20 + .../Gateway/Workers/WorkerClientTests.cs | 34 ++ .../Bootstrap/WorkerApplicationTests.cs | 2 +- .../Ipc/WorkerPipeClientTests.cs | 89 +++- .../Ipc/WorkerPipeSessionTests.cs | 407 ++++++++++++++++++ src/MxGateway.Worker/Ipc/WorkerPipeClient.cs | 16 +- src/MxGateway.Worker/Ipc/WorkerPipeSession.cs | 334 +++++++++++++- .../Ipc/WorkerPipeSessionOptions.cs | 36 ++ .../MxAccess/IWorkerRuntimeSession.cs | 20 + .../MxAccess/MxAccessStaSession.cs | 28 +- .../WorkerRuntimeHeartbeatSnapshot.cs | 30 ++ src/MxGateway.Worker/WorkerApplication.cs | 2 +- 15 files changed, 1048 insertions(+), 20 deletions(-) create mode 100644 src/MxGateway.Worker/Ipc/WorkerPipeSessionOptions.cs create mode 100644 src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs create mode 100644 src/MxGateway.Worker/MxAccess/WorkerRuntimeHeartbeatSnapshot.cs diff --git a/docs/GatewayTesting.md b/docs/GatewayTesting.md index 6412a7d..040707a 100644 --- a/docs/GatewayTesting.md +++ b/docs/GatewayTesting.md @@ -18,6 +18,7 @@ starting `MxGateway.Worker.exe` or loading MXAccess COM. The harness scripts: - `WorkerHello` and `WorkerReady` startup, - command replies with matching correlation ids, - ordered `WorkerEvent` frames, +- `WorkerHeartbeat` frames, - `WorkerFault` frames, - shutdown acknowledgements, - malformed protobuf payloads and oversized frame headers, @@ -43,6 +44,8 @@ event streaming behavior: dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~FakeWorkerHarnessTests dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~SessionWorkerClientFactoryFakeWorkerTests dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~GatewayEndToEndFakeWorkerSmokeTests +dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~WorkerClientTests +dotnet test src/MxGateway.Worker.Tests/MxGateway.Worker.Tests.csproj -p:Platform=x86 --filter FullyQualifiedName~WorkerPipeSessionTests ``` Run the gateway test project after shared gateway test infrastructure changes: diff --git a/docs/mxaccess-worker-instance-design.md b/docs/mxaccess-worker-instance-design.md index 05bef4b..5c4aeb4 100644 --- a/docs/mxaccess-worker-instance-design.md +++ b/docs/mxaccess-worker-instance-design.md @@ -576,13 +576,19 @@ Do not drop or coalesce events in v1. ## Heartbeat And Watchdog -The worker heartbeat should prove that: +`WorkerPipeSession` starts the heartbeat loop after the gateway validates +`WorkerHello` and receives `WorkerReady`. Heartbeats continue until +`WorkerShutdown`, cancellation, or a pipe/protocol failure stops the session. +The loop uses `WorkerPipeSessionOptions.HeartbeatInterval`; the default matches +the gateway worker heartbeat interval. + +The worker heartbeat proves that: - pipe writer is alive, - worker host is alive, - STA has recently pumped or completed work. -Heartbeat payload should include: +Heartbeat payload includes: - worker process id, - session id, @@ -593,13 +599,19 @@ Heartbeat payload should include: - event sequence, - current command correlation id if any. -The STA watchdog should warn when: +`MxAccessStaSession.CaptureHeartbeat()` reads `StaRuntime.LastActivityUtc` and +`StaCommandDispatcher` queue state without touching the raw MXAccess COM object +outside the STA. Event queue depth and event sequence are reported as zero until +the event queue implementation owns those counters. -- one command exceeds its expected duration, -- the STA has not pumped messages within the heartbeat grace period, -- event queue depth remains high. - -The worker can report the problem, but the gateway owns the final kill decision. +The STA watchdog currently emits a `WorkerFault` with +`WorkerFaultCategory.StaHung` when `LastStaActivityUtc` is older than +`WorkerPipeSessionOptions.HeartbeatGrace`. The fault includes the current +command correlation id when a command is active. Command duration and high event +queue depth remain observable through heartbeat fields until dedicated +thresholds own those warnings. The worker reports stale STA activity, but the +gateway owns the final kill decision through its existing heartbeat and worker +lifecycle policy. ## Shutdown diff --git a/src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs b/src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs index b5daed4..c9daa54 100644 --- a/src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs +++ b/src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs @@ -105,6 +105,25 @@ public sealed class FakeWorkerHarnessTests Assert.Equal(WorkerClientState.Faulted, client.State); } + [Fact] + public async Task SendHeartbeatAsync_UpdatesClientHeartbeatState() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + DateTimeOffset previousHeartbeat = client.LastHeartbeatAt; + + await Task.Delay(TimeSpan.FromMilliseconds(20)); + await fakeWorker.SendHeartbeatAsync( + configureHeartbeat: heartbeat => heartbeat.WorkerProcessId = 2468); + + await WaitUntilAsync( + () => client.ProcessId == 2468 && client.LastHeartbeatAt > previousHeartbeat, + TestTimeout); + + Assert.Equal(WorkerClientState.Ready, client.State); + } + [Fact] public async Task InvokeAsync_WithHungWorker_TimesOutPendingCommand() { diff --git a/src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs b/src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs index 5981618..c9e5883 100644 --- a/src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs +++ b/src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs @@ -284,6 +284,26 @@ public sealed class FakeWorkerHarness : IAsyncDisposable cancellationToken).ConfigureAwait(false); } + public async Task SendHeartbeatAsync( + WorkerState state = WorkerState.Ready, + CancellationToken cancellationToken = default, + Action? configureHeartbeat = null) + { + WorkerHeartbeat heartbeat = new() + { + WorkerProcessId = DefaultWorkerProcessId, + State = state, + LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }; + configureHeartbeat?.Invoke(heartbeat); + + await _writer.WriteAsync( + CreateEnvelope( + correlationId: string.Empty, + envelope => envelope.WorkerHeartbeat = heartbeat), + cancellationToken).ConfigureAwait(false); + } + public async Task SendShutdownAckAsync( ProtocolStatusCode statusCode = ProtocolStatusCode.Ok, CancellationToken cancellationToken = default) diff --git a/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs b/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs index cf55511..1d8e03b 100644 --- a/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs +++ b/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs @@ -1,4 +1,5 @@ using System.IO.Pipes; +using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Server.Workers; @@ -151,6 +152,24 @@ public sealed class WorkerClientTests Assert.Equal(WorkerClientState.Faulted, client.State); } + [Fact] + public async Task ReadLoop_WhenHeartbeatArrives_UpdatesLastHeartbeatAndWorkerProcess() + { + await using PipePair pipePair = await PipePair.CreateAsync(); + await using WorkerClient client = CreateClient(pipePair); + await CompleteHandshakeAsync(client, pipePair); + DateTimeOffset previousHeartbeat = client.LastHeartbeatAt; + + await Task.Delay(TimeSpan.FromMilliseconds(20)); + await pipePair.WorkerWriter.WriteAsync(CreateHeartbeatEnvelope(workerProcessId: 9876)); + + await WaitUntilAsync( + () => client.ProcessId == 9876 && client.LastHeartbeatAt > previousHeartbeat, + TestTimeout); + + Assert.Equal(WorkerClientState.Ready, client.State); + } + [Fact] public async Task HeartbeatMonitor_WhenHeartbeatExpires_FaultsClient() { @@ -276,6 +295,21 @@ public sealed class WorkerClientTests }); } + private static WorkerEnvelope CreateHeartbeatEnvelope(int workerProcessId) + { + return CreateWorkerEnvelope( + correlationId: string.Empty, + sequence: 20, + envelope => envelope.WorkerHeartbeat = new WorkerHeartbeat + { + WorkerProcessId = workerProcessId, + State = WorkerState.Ready, + LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + PendingCommandCount = 0, + OutboundEventQueueDepth = 0, + }); + } + private static WorkerEnvelope CreateWorkerEnvelope( string correlationId, ulong sequence, diff --git a/src/MxGateway.Worker.Tests/Bootstrap/WorkerApplicationTests.cs b/src/MxGateway.Worker.Tests/Bootstrap/WorkerApplicationTests.cs index a142266..cfda899 100644 --- a/src/MxGateway.Worker.Tests/Bootstrap/WorkerApplicationTests.cs +++ b/src/MxGateway.Worker.Tests/Bootstrap/WorkerApplicationTests.cs @@ -30,7 +30,7 @@ public sealed class WorkerApplicationTests Assert.Equal("mxaccess-gateway-123-session-1", entry.Fields["pipe_name"]); Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, entry.Fields["protocol_version"]); Assert.Equal("[redacted]", entry.Fields["nonce"]); - Assert.Equal("WorkerPipeHandshakeSucceeded", logger.Entries[1].EventName); + Assert.Equal("WorkerPipeSessionCompleted", logger.Entries[1].EventName); } [Fact] diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs index 36caa96..958ec75 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs @@ -1,10 +1,15 @@ using System; +using System.IO; using System.IO.Pipes; +using System.Threading; using System.Threading.Tasks; +using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Worker.Bootstrap; using MxGateway.Worker.Ipc; +using MxGateway.Worker.MxAccess; +using MxGateway.Worker.Sta; namespace MxGateway.Worker.Tests.Ipc; @@ -28,7 +33,9 @@ public sealed class WorkerPipeClientTests PipeTransmissionMode.Byte, PipeOptions.Asynchronous); - WorkerPipeClient client = new(connectTimeoutMilliseconds: 5000); + WorkerPipeClient client = new( + connectTimeoutMilliseconds: 5000, + (stream, options) => CreateSession(stream, options)); Task clientTask = client.RunAsync(workerOptions); await Task.Factory.FromAsync(server.BeginWaitForConnection, server.EndWaitForConnection, null); @@ -56,6 +63,86 @@ public sealed class WorkerPipeClientTests WorkerEnvelope ready = await reader.ReadAsync(); Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, ready.BodyCase); + await writer.WriteAsync(new WorkerEnvelope + { + ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + SessionId = "session-1", + Sequence = 2, + WorkerShutdown = new WorkerShutdown + { + GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)), + Reason = "test-complete", + }, + }); + + WorkerEnvelope shutdownAck = await reader.ReadAsync(); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, shutdownAck.BodyCase); await clientTask; } + + private static WorkerPipeSession CreateSession( + Stream stream, + WorkerFrameProtocolOptions options) + { + return new WorkerPipeSession( + new WorkerFrameReader(stream, options), + new WorkerFrameWriter(stream, options), + options, + () => 1234, + new WorkerPipeSessionOptions + { + HeartbeatInterval = TimeSpan.FromSeconds(30), + HeartbeatGrace = TimeSpan.FromSeconds(30), + }, + () => new FakeRuntimeSession()); + } + + private sealed class FakeRuntimeSession : IWorkerRuntimeSession + { + public Task StartAsync( + int workerProcessId, + CancellationToken cancellationToken = default) + { + return Task.FromResult(new WorkerReady + { + WorkerProcessId = workerProcessId, + MxaccessProgid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.ProgId, + MxaccessClsid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.Clsid, + ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }); + } + + public Task DispatchAsync(StaCommand command) + { + return Task.FromResult(new MxCommandReply + { + SessionId = command.SessionId, + CorrelationId = command.CorrelationId, + Kind = command.Kind, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = "OK", + }, + }); + } + + public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat() + { + return new WorkerRuntimeHeartbeatSnapshot( + DateTimeOffset.UtcNow, + pendingCommandCount: 0, + outboundEventQueueDepth: 0, + lastEventSequence: 0, + currentCommandCorrelationId: string.Empty); + } + + public void RequestShutdown() + { + } + + public void Dispose() + { + } + } } diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs index e1670d7..817582d 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs @@ -1,11 +1,16 @@ +using System; using System.Collections.Generic; using System.IO; +using System.IO.Pipes; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; +using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Worker.Ipc; +using MxGateway.Worker.MxAccess; +using MxGateway.Worker.Sta; namespace MxGateway.Worker.Tests.Ipc; @@ -147,6 +152,127 @@ public sealed class WorkerPipeSessionTests Assert.Equal(ProtocolStatusCode.WorkerUnavailable, written[1].WorkerFault.ProtocolStatus.Code); } + [Fact] + public async Task RunAsync_SendsHeartbeatPayloadFromRuntimeSnapshot() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + runtime.SetSnapshot(new WorkerRuntimeHeartbeatSnapshot( + DateTimeOffset.UtcNow, + pendingCommandCount: 2, + outboundEventQueueDepth: 3, + lastEventSequence: 42, + currentCommandCorrelationId: "current-command")); + WorkerPipeSession session = CreatePipeSession( + pipePair.WorkerStream, + runtime, + new WorkerPipeSessionOptions + { + HeartbeatInterval = TimeSpan.FromMilliseconds(20), + HeartbeatGrace = TimeSpan.FromSeconds(5), + }); + Task runTask = session.RunAsync(cancellation.Token); + + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + await ThrowIfCompletedAsync(runTask); + + WorkerEnvelope heartbeat = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerHeartbeat, + cancellation.Token); + + Assert.Equal(WorkerState.ExecutingCommand, heartbeat.WorkerHeartbeat.State); + Assert.Equal(1234, heartbeat.WorkerHeartbeat.WorkerProcessId); + Assert.Equal(2u, heartbeat.WorkerHeartbeat.PendingCommandCount); + Assert.Equal(3u, heartbeat.WorkerHeartbeat.OutboundEventQueueDepth); + Assert.Equal(42UL, heartbeat.WorkerHeartbeat.LastEventSequence); + Assert.Equal("current-command", heartbeat.WorkerHeartbeat.CurrentCommandCorrelationId); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + [Fact] + public async Task RunAsync_WhenCommandIsExecuting_HeartbeatReportsCurrentCorrelation() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new() + { + BlockDispatch = true, + }; + WorkerPipeSession session = CreatePipeSession( + pipePair.WorkerStream, + runtime, + new WorkerPipeSessionOptions + { + HeartbeatInterval = TimeSpan.FromMilliseconds(20), + HeartbeatGrace = TimeSpan.FromSeconds(5), + }); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + await pipePair.GatewayWriter.WriteAsync( + CreateCommandEnvelope("command-1"), + cancellation.Token); + + Assert.True(runtime.DispatchStarted.Wait(TimeSpan.FromSeconds(2))); + WorkerEnvelope heartbeat = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerHeartbeat, + envelope => envelope.WorkerHeartbeat.CurrentCommandCorrelationId == "command-1", + cancellation.Token); + + Assert.Equal("command-1", heartbeat.WorkerHeartbeat.CurrentCommandCorrelationId); + Assert.Equal(WorkerState.ExecutingCommand, heartbeat.WorkerHeartbeat.State); + + runtime.ReleaseDispatch(); + WorkerEnvelope reply = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerCommandReply, + cancellation.Token); + + Assert.Equal("command-1", reply.CorrelationId); + Assert.Equal(ProtocolStatusCode.Ok, reply.WorkerCommandReply.Reply.ProtocolStatus.Code); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + [Fact] + public async Task RunAsync_WhenStaActivityIsStale_WritesWatchdogFault() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + runtime.SetSnapshot(new WorkerRuntimeHeartbeatSnapshot( + DateTimeOffset.UtcNow - TimeSpan.FromSeconds(5), + pendingCommandCount: 0, + outboundEventQueueDepth: 0, + lastEventSequence: 0, + currentCommandCorrelationId: "stuck-command")); + WorkerPipeSession session = CreatePipeSession( + pipePair.WorkerStream, + runtime, + new WorkerPipeSessionOptions + { + HeartbeatInterval = TimeSpan.FromMilliseconds(20), + HeartbeatGrace = TimeSpan.FromMilliseconds(50), + }); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + WorkerEnvelope fault = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerFault, + cancellation.Token); + + Assert.Equal(WorkerFaultCategory.StaHung, fault.WorkerFault.Category); + Assert.Equal("stuck-command", fault.WorkerFault.CommandMethod); + Assert.Contains("STA activity is stale", fault.WorkerFault.DiagnosticMessage); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + private static WorkerPipeSession CreateSession( Stream inbound, Stream outbound, @@ -159,6 +285,21 @@ public sealed class WorkerPipeSessionTests () => 1234); } + private static WorkerPipeSession CreatePipeSession( + Stream stream, + FakeRuntimeSession runtime, + WorkerPipeSessionOptions sessionOptions) + { + WorkerFrameProtocolOptions options = CreateOptions(); + return new WorkerPipeSession( + new WorkerFrameReader(stream, options), + new WorkerFrameWriter(stream, options), + options, + () => 1234, + sessionOptions, + () => runtime); + } + private static WorkerFrameProtocolOptions CreateOptions() { return new WorkerFrameProtocolOptions( @@ -185,6 +326,119 @@ public sealed class WorkerPipeSessionTests }; } + private static WorkerEnvelope CreateCommandEnvelope(string correlationId) + { + return new WorkerEnvelope + { + ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + SessionId = SessionId, + Sequence = 2, + CorrelationId = correlationId, + WorkerCommand = new WorkerCommand + { + Command = new MxCommand + { + Kind = MxCommandKind.Ping, + Ping = new PingCommand + { + Message = "ping", + }, + }, + EnqueueTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }, + }; + } + + private static WorkerEnvelope CreateShutdownEnvelope() + { + return new WorkerEnvelope + { + ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + SessionId = SessionId, + Sequence = 3, + WorkerShutdown = new WorkerShutdown + { + GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)), + Reason = "test-complete", + }, + }; + } + + private static async Task CompleteGatewayHandshakeAsync( + PipePair pipePair, + CancellationToken cancellationToken) + { + await pipePair.GatewayWriter + .WriteAsync(CreateGatewayHelloEnvelope(), cancellationToken) + .ConfigureAwait(false); + + WorkerEnvelope hello = await pipePair.GatewayReader.ReadAsync(cancellationToken).ConfigureAwait(false); + WorkerEnvelope ready = await pipePair.GatewayReader.ReadAsync(cancellationToken).ConfigureAwait(false); + + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, hello.BodyCase); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, ready.BodyCase); + } + + private static async Task SendShutdownAndWaitAsync( + PipePair pipePair, + Task runTask, + CancellationToken cancellationToken) + { + await pipePair.GatewayWriter + .WriteAsync(CreateShutdownEnvelope(), cancellationToken) + .ConfigureAwait(false); + + WorkerEnvelope shutdownAck = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, + cancellationToken); + + Assert.Equal(ProtocolStatusCode.Ok, shutdownAck.WorkerShutdownAck.Status.Code); + Task completedTask = await Task + .WhenAny(runTask, Task.Delay(TimeSpan.FromSeconds(2), cancellationToken)) + .ConfigureAwait(false); + + Assert.Same(runTask, completedTask); + await runTask.ConfigureAwait(false); + } + + private static async Task ThrowIfCompletedAsync(Task task) + { + await Task.Delay(TimeSpan.FromMilliseconds(100)); + if (task.IsCompleted) + { + await task; + } + } + + private static Task ReadUntilAsync( + WorkerFrameReader reader, + WorkerEnvelope.BodyOneofCase expectedBody, + CancellationToken cancellationToken) + { + return ReadUntilAsync( + reader, + expectedBody, + _ => true, + cancellationToken); + } + + private static async Task ReadUntilAsync( + WorkerFrameReader reader, + WorkerEnvelope.BodyOneofCase expectedBody, + Func predicate, + CancellationToken cancellationToken) + { + while (true) + { + WorkerEnvelope envelope = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); + if (envelope.BodyCase == expectedBody && predicate(envelope)) + { + return envelope; + } + } + } + private static WorkerEnvelope[] ReadWrittenFrames( MemoryStream stream, WorkerFrameProtocolOptions options) @@ -219,4 +473,157 @@ public sealed class WorkerPipeSessionTests buffer[2] = (byte)(value >> 16); buffer[3] = (byte)(value >> 24); } + + private sealed class FakeRuntimeSession : IWorkerRuntimeSession + { + private readonly ManualResetEventSlim releaseDispatch = new(false); + private readonly object gate = new(); + private WorkerRuntimeHeartbeatSnapshot snapshot = new( + DateTimeOffset.UtcNow, + pendingCommandCount: 0, + outboundEventQueueDepth: 0, + lastEventSequence: 0, + currentCommandCorrelationId: string.Empty); + + public ManualResetEventSlim DispatchStarted { get; } = new(false); + + public bool BlockDispatch { get; set; } + + public Task StartAsync( + int workerProcessId, + CancellationToken cancellationToken = default) + { + return Task.FromResult(new WorkerReady + { + WorkerProcessId = workerProcessId, + MxaccessProgid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.ProgId, + MxaccessClsid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.Clsid, + ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }); + } + + public Task DispatchAsync(StaCommand command) + { + return Task.Run( + () => + { + SetSnapshot(new WorkerRuntimeHeartbeatSnapshot( + DateTimeOffset.UtcNow, + pendingCommandCount: 0, + outboundEventQueueDepth: 0, + lastEventSequence: 0, + command.CorrelationId)); + DispatchStarted.Set(); + + if (BlockDispatch) + { + releaseDispatch.Wait(TimeSpan.FromSeconds(5)); + } + + SetSnapshot(new WorkerRuntimeHeartbeatSnapshot( + DateTimeOffset.UtcNow, + pendingCommandCount: 0, + outboundEventQueueDepth: 0, + lastEventSequence: 0, + currentCommandCorrelationId: string.Empty)); + + return new MxCommandReply + { + SessionId = command.SessionId, + CorrelationId = command.CorrelationId, + Kind = command.Kind, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = "OK", + }, + }; + }); + } + + public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat() + { + lock (gate) + { + return snapshot; + } + } + + public void RequestShutdown() + { + releaseDispatch.Set(); + } + + public void ReleaseDispatch() + { + releaseDispatch.Set(); + } + + public void SetSnapshot(WorkerRuntimeHeartbeatSnapshot value) + { + lock (gate) + { + snapshot = value; + } + } + + public void Dispose() + { + releaseDispatch.Set(); + releaseDispatch.Dispose(); + DispatchStarted.Dispose(); + } + } + + private sealed class PipePair : IDisposable + { + private readonly NamedPipeServerStream gatewayStream; + + private PipePair( + NamedPipeServerStream gatewayStream, + NamedPipeClientStream workerStream) + { + this.gatewayStream = gatewayStream; + WorkerStream = workerStream; + WorkerFrameProtocolOptions options = CreateOptions(); + GatewayReader = new WorkerFrameReader(gatewayStream, options); + GatewayWriter = new WorkerFrameWriter(gatewayStream, options); + } + + public Stream WorkerStream { get; } + + public WorkerFrameReader GatewayReader { get; } + + public WorkerFrameWriter GatewayWriter { get; } + + public static async Task CreateAsync(CancellationToken cancellationToken) + { + string pipeName = $"mxaccessgw-worker-session-tests-{Guid.NewGuid():N}"; + NamedPipeServerStream gatewayStream = new( + pipeName, + PipeDirection.InOut, + maxNumberOfServerInstances: 1, + PipeTransmissionMode.Byte, + PipeOptions.Asynchronous); + NamedPipeClientStream workerStream = new( + ".", + pipeName, + PipeDirection.InOut, + PipeOptions.Asynchronous); + + Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync(); + await Task + .Run(() => workerStream.Connect(5000), cancellationToken) + .ConfigureAwait(false); + await waitForConnectionTask.ConfigureAwait(false); + + return new PipePair(gatewayStream, workerStream); + } + + public void Dispose() + { + WorkerStream.Dispose(); + gatewayStream.Dispose(); + } + } } diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs index e9e408f..d60ce04 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs @@ -1,4 +1,5 @@ using System; +using System.IO; using System.IO.Pipes; using System.Threading; using System.Threading.Tasks; @@ -11,6 +12,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient public const int DefaultConnectTimeoutMilliseconds = 30000; private readonly int _connectTimeoutMilliseconds; + private readonly Func _sessionFactory; public WorkerPipeClient() : this(DefaultConnectTimeoutMilliseconds) @@ -18,6 +20,15 @@ public sealed class WorkerPipeClient : IWorkerPipeClient } public WorkerPipeClient(int connectTimeoutMilliseconds) + : this( + connectTimeoutMilliseconds, + (stream, frameOptions) => new WorkerPipeSession(stream, frameOptions)) + { + } + + public WorkerPipeClient( + int connectTimeoutMilliseconds, + Func sessionFactory) { if (connectTimeoutMilliseconds <= 0) { @@ -26,6 +37,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient "Worker pipe connect timeout must be greater than zero."); } + _sessionFactory = sessionFactory ?? throw new ArgumentNullException(nameof(sessionFactory)); _connectTimeoutMilliseconds = connectTimeoutMilliseconds; } @@ -48,8 +60,8 @@ public sealed class WorkerPipeClient : IWorkerPipeClient await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false); - WorkerPipeSession session = new(pipe, frameOptions); - await session.CompleteStartupHandshakeAsync(cancellationToken).ConfigureAwait(false); + WorkerPipeSession session = _sessionFactory(pipe, frameOptions); + await session.RunAsync(cancellationToken).ConfigureAwait(false); } private Task ConnectAsync( diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs index 6cb9349..d7ac25d 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto; using MxGateway.Worker.MxAccess; +using MxGateway.Worker.Sta; namespace MxGateway.Worker.Ipc; @@ -13,10 +14,14 @@ public sealed class WorkerPipeSession { private readonly WorkerFrameProtocolOptions _options; private readonly Func _processIdProvider; + private readonly Func _runtimeSessionFactory; + private readonly WorkerPipeSessionOptions _sessionOptions; private readonly WorkerFrameReader _reader; private readonly WorkerFrameWriter _writer; - private MxAccessStaSession? _mxAccessStaSession; + private IWorkerRuntimeSession? _runtimeSession; private long _nextSequence; + private WorkerState _state = WorkerState.Starting; + private bool _watchdogFaultSent; public WorkerPipeSession( Stream stream, @@ -34,11 +39,49 @@ public sealed class WorkerPipeSession WorkerFrameWriter writer, WorkerFrameProtocolOptions options, Func processIdProvider) + : this( + reader, + writer, + options, + processIdProvider, + new WorkerPipeSessionOptions(), + () => new MxAccessStaSession()) + { + } + + public WorkerPipeSession( + WorkerFrameReader reader, + WorkerFrameWriter writer, + WorkerFrameProtocolOptions options, + Func processIdProvider, + WorkerPipeSessionOptions sessionOptions, + Func runtimeSessionFactory) { _reader = reader ?? throw new ArgumentNullException(nameof(reader)); _writer = writer ?? throw new ArgumentNullException(nameof(writer)); _options = options ?? throw new ArgumentNullException(nameof(options)); _processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider)); + _sessionOptions = sessionOptions ?? throw new ArgumentNullException(nameof(sessionOptions)); + _runtimeSessionFactory = runtimeSessionFactory ?? throw new ArgumentNullException(nameof(runtimeSessionFactory)); + _sessionOptions.Validate(); + } + + public async Task RunAsync(CancellationToken cancellationToken = default) + { + _runtimeSession = _runtimeSessionFactory(); + try + { + await CompleteStartupHandshakeAsync( + token => _runtimeSession.StartAsync(_processIdProvider(), token), + cancellationToken).ConfigureAwait(false); + await RunMessageLoopAsync(cancellationToken).ConfigureAwait(false); + } + finally + { + _runtimeSession?.Dispose(); + _runtimeSession = null; + _state = WorkerState.Stopped; + } } public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default) @@ -76,11 +119,14 @@ public sealed class WorkerPipeSession try { WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false); + _state = WorkerState.Handshaking; ValidateGatewayHello(envelope); await WriteWorkerHelloAsync(cancellationToken).ConfigureAwait(false); + _state = WorkerState.InitializingSta; WorkerReady ready = await initializeMxAccessAsync(cancellationToken).ConfigureAwait(false); await WriteWorkerReadyAsync(ready, cancellationToken).ConfigureAwait(false); + _state = WorkerState.Ready; } catch (WorkerFrameProtocolException exception) { @@ -140,6 +186,174 @@ public sealed class WorkerPipeSession return _writer.WriteAsync(CreateEnvelope(ready), cancellationToken); } + private async Task RunMessageLoopAsync(CancellationToken cancellationToken) + { + using CancellationTokenSource heartbeatCancellation = CancellationTokenSource + .CreateLinkedTokenSource(cancellationToken); + Task heartbeatTask = RunHeartbeatLoopAsync(heartbeatCancellation.Token); + + try + { + while (!cancellationToken.IsCancellationRequested) + { + Task readTask = _reader.ReadAsync(cancellationToken); + Task completedTask = await Task.WhenAny(readTask, heartbeatTask).ConfigureAwait(false); + if (completedTask == heartbeatTask) + { + await heartbeatTask.ConfigureAwait(false); + } + + WorkerEnvelope envelope = await readTask.ConfigureAwait(false); + bool keepReading = await DispatchGatewayEnvelopeAsync(envelope, cancellationToken).ConfigureAwait(false); + if (!keepReading) + { + return; + } + } + } + finally + { + heartbeatCancellation.Cancel(); + try + { + await heartbeatTask.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + } + } + + private async Task DispatchGatewayEnvelopeAsync( + WorkerEnvelope envelope, + CancellationToken cancellationToken) + { + switch (envelope.BodyCase) + { + case WorkerEnvelope.BodyOneofCase.WorkerCommand: + _ = ProcessCommandAsync(envelope, cancellationToken); + return true; + case WorkerEnvelope.BodyOneofCase.WorkerShutdown: + await ShutdownAsync(envelope.WorkerShutdown, cancellationToken).ConfigureAwait(false); + return false; + case WorkerEnvelope.BodyOneofCase.WorkerCancel: + return true; + default: + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.UnexpectedEnvelopeBody, + $"Worker received unexpected gateway envelope body {envelope.BodyCase}."); + } + } + + private async Task ProcessCommandAsync( + WorkerEnvelope envelope, + CancellationToken cancellationToken) + { + IWorkerRuntimeSession runtimeSession = _runtimeSession + ?? throw new InvalidOperationException("Worker runtime session has not been initialized."); + WorkerCommand workerCommand = envelope.WorkerCommand; + MxCommand command = workerCommand.Command; + StaCommand staCommand = new( + _options.SessionId, + envelope.CorrelationId, + command, + workerCommand.EnqueueTimestamp, + cancellationToken); + + try + { + MxCommandReply reply = await runtimeSession.DispatchAsync(staCommand).ConfigureAwait(false); + await _writer + .WriteAsync( + CreateEnvelope(new WorkerCommandReply + { + Reply = reply, + CompletedTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), + }), + cancellationToken) + .ConfigureAwait(false); + } + catch (Exception exception) when (exception is not OperationCanceledException) + { + _state = WorkerState.Faulted; + await TryWriteFaultAsync( + CreateFault( + WorkerFaultCategory.MxaccessCommandFailed, + staCommand.MethodName, + exception), + cancellationToken).ConfigureAwait(false); + } + } + + private async Task ShutdownAsync( + WorkerShutdown shutdown, + CancellationToken cancellationToken) + { + _state = WorkerState.ShuttingDown; + _runtimeSession?.RequestShutdown(); + + await _writer + .WriteAsync( + CreateEnvelope( + new WorkerShutdownAck + { + Status = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = string.IsNullOrWhiteSpace(shutdown.Reason) + ? "Worker shutdown accepted." + : $"Worker shutdown accepted: {shutdown.Reason}", + }, + }), + cancellationToken) + .ConfigureAwait(false); + } + + private async Task RunHeartbeatLoopAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + await Task.Delay(_sessionOptions.HeartbeatInterval, cancellationToken).ConfigureAwait(false); + IWorkerRuntimeSession? runtimeSession = _runtimeSession; + if (runtimeSession is null) + { + continue; + } + + WorkerRuntimeHeartbeatSnapshot snapshot = runtimeSession.CaptureHeartbeat(); + await _writer + .WriteAsync(CreateEnvelope(CreateHeartbeat(snapshot)), cancellationToken) + .ConfigureAwait(false); + + await ReportWatchdogFaultIfNeededAsync(snapshot, cancellationToken).ConfigureAwait(false); + } + } + + private async Task ReportWatchdogFaultIfNeededAsync( + WorkerRuntimeHeartbeatSnapshot snapshot, + CancellationToken cancellationToken) + { + TimeSpan staleFor = DateTimeOffset.UtcNow - snapshot.LastStaActivityUtc; + if (staleFor <= _sessionOptions.HeartbeatGrace) + { + _watchdogFaultSent = false; + return; + } + + if (_watchdogFaultSent) + { + return; + } + + _watchdogFaultSent = true; + await TryWriteFaultAsync( + CreateFault( + WorkerFaultCategory.StaHung, + snapshot.CurrentCommandCorrelationId, + $"STA activity is stale by {staleFor}."), + cancellationToken).ConfigureAwait(false); + } + private async Task TryWriteFaultAsync( WorkerFrameProtocolException exception, CancellationToken cancellationToken) @@ -178,6 +392,25 @@ public sealed class WorkerPipeSession } } + private async Task TryWriteFaultAsync( + WorkerFault fault, + CancellationToken cancellationToken) + { + try + { + await _writer + .WriteAsync(CreateEnvelope(fault), cancellationToken) + .ConfigureAwait(false); + } + catch (Exception faultWriteException) when ( + faultWriteException is IOException + || faultWriteException is ObjectDisposedException + || faultWriteException is WorkerFrameProtocolException) + { + // The runtime fault remains observable through worker exit or pipe closure. + } + } + private WorkerEnvelope CreateEnvelope(WorkerHello hello) { return CreateBaseEnvelope(hello); @@ -193,6 +426,21 @@ public sealed class WorkerPipeSession return CreateBaseEnvelope(fault); } + private WorkerEnvelope CreateEnvelope(WorkerCommandReply reply) + { + return CreateBaseEnvelope(reply); + } + + private WorkerEnvelope CreateEnvelope(WorkerShutdownAck shutdownAck) + { + return CreateBaseEnvelope(shutdownAck); + } + + private WorkerEnvelope CreateEnvelope(WorkerHeartbeat heartbeat) + { + return CreateBaseEnvelope(heartbeat); + } + private WorkerEnvelope CreateBaseEnvelope(WorkerHello body) { WorkerEnvelope envelope = CreateBaseEnvelope(); @@ -214,6 +462,28 @@ public sealed class WorkerPipeSession return envelope; } + private WorkerEnvelope CreateBaseEnvelope(WorkerCommandReply body) + { + WorkerEnvelope envelope = CreateBaseEnvelope(); + envelope.CorrelationId = body.Reply?.CorrelationId ?? string.Empty; + envelope.WorkerCommandReply = body; + return envelope; + } + + private WorkerEnvelope CreateBaseEnvelope(WorkerShutdownAck body) + { + WorkerEnvelope envelope = CreateBaseEnvelope(); + envelope.WorkerShutdownAck = body; + return envelope; + } + + private WorkerEnvelope CreateBaseEnvelope(WorkerHeartbeat body) + { + WorkerEnvelope envelope = CreateBaseEnvelope(); + envelope.WorkerHeartbeat = body; + return envelope; + } + private WorkerEnvelope CreateBaseEnvelope() { return new WorkerEnvelope @@ -231,21 +501,39 @@ public sealed class WorkerPipeSession private async Task InitializeMxAccessAsync(CancellationToken cancellationToken) { - _mxAccessStaSession = new MxAccessStaSession(); + _runtimeSession = new MxAccessStaSession(); try { - return await _mxAccessStaSession + return await _runtimeSession .StartAsync(_processIdProvider(), cancellationToken) .ConfigureAwait(false); } catch { - _mxAccessStaSession.Dispose(); - _mxAccessStaSession = null; + _runtimeSession.Dispose(); + _runtimeSession = null; throw; } } + private WorkerHeartbeat CreateHeartbeat(WorkerRuntimeHeartbeatSnapshot snapshot) + { + WorkerState state = string.IsNullOrWhiteSpace(snapshot.CurrentCommandCorrelationId) + ? _state + : WorkerState.ExecutingCommand; + + return new WorkerHeartbeat + { + WorkerProcessId = _processIdProvider(), + State = state, + LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(snapshot.LastStaActivityUtc), + PendingCommandCount = snapshot.PendingCommandCount, + OutboundEventQueueDepth = snapshot.OutboundEventQueueDepth, + LastEventSequence = snapshot.LastEventSequence, + CurrentCommandCorrelationId = snapshot.CurrentCommandCorrelationId, + }; + } + private WorkerReady CreateWorkerReady() { return new WorkerReady @@ -295,6 +583,42 @@ public sealed class WorkerPipeSession return fault; } + private static WorkerFault CreateFault( + WorkerFaultCategory category, + string commandMethod, + Exception exception) + { + WorkerFault fault = CreateFault( + category, + commandMethod, + exception.Message); + fault.ExceptionType = exception.GetType().FullName ?? string.Empty; + fault.ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.WorkerUnavailable, + Message = exception.Message, + }; + return fault; + } + + private static WorkerFault CreateFault( + WorkerFaultCategory category, + string commandMethod, + string diagnosticMessage) + { + return new WorkerFault + { + Category = category, + CommandMethod = commandMethod ?? string.Empty, + DiagnosticMessage = diagnosticMessage, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.WorkerUnavailable, + Message = diagnosticMessage, + }, + }; + } + private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode) { return errorCode switch diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeSessionOptions.cs b/src/MxGateway.Worker/Ipc/WorkerPipeSessionOptions.cs new file mode 100644 index 0000000..2e60463 --- /dev/null +++ b/src/MxGateway.Worker/Ipc/WorkerPipeSessionOptions.cs @@ -0,0 +1,36 @@ +using System; + +namespace MxGateway.Worker.Ipc; + +public sealed class WorkerPipeSessionOptions +{ + public static readonly TimeSpan DefaultHeartbeatInterval = TimeSpan.FromSeconds(5); + public static readonly TimeSpan DefaultHeartbeatGrace = TimeSpan.FromSeconds(15); + + public WorkerPipeSessionOptions() + { + HeartbeatInterval = DefaultHeartbeatInterval; + HeartbeatGrace = DefaultHeartbeatGrace; + } + + public TimeSpan HeartbeatInterval { get; set; } + + public TimeSpan HeartbeatGrace { get; set; } + + public void Validate() + { + if (HeartbeatInterval <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + nameof(HeartbeatInterval), + "Worker heartbeat interval must be greater than zero."); + } + + if (HeartbeatGrace <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + nameof(HeartbeatGrace), + "Worker heartbeat grace must be greater than zero."); + } + } +} diff --git a/src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs b/src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs new file mode 100644 index 0000000..ada6a46 --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs @@ -0,0 +1,20 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using MxGateway.Contracts.Proto; +using MxGateway.Worker.Sta; + +namespace MxGateway.Worker.MxAccess; + +public interface IWorkerRuntimeSession : IDisposable +{ + Task StartAsync( + int workerProcessId, + CancellationToken cancellationToken = default); + + Task DispatchAsync(StaCommand command); + + WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat(); + + void RequestShutdown(); +} diff --git a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs index de522f3..505c227 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs @@ -7,7 +7,7 @@ using MxGateway.Worker.Sta; namespace MxGateway.Worker.MxAccess; -public sealed class MxAccessStaSession : IDisposable +public sealed class MxAccessStaSession : IWorkerRuntimeSession { private readonly IMxAccessComObjectFactory factory; private readonly IMxAccessEventSink eventSink; @@ -68,6 +68,30 @@ public sealed class MxAccessStaSession : IDisposable return commandDispatcher.DispatchAsync(command); } + public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat() + { + uint pendingCommandCount = 0; + string currentCommandCorrelationId = string.Empty; + + if (commandDispatcher is not null) + { + pendingCommandCount = (uint)commandDispatcher.PendingCommandCount; + currentCommandCorrelationId = commandDispatcher.CurrentCommandCorrelationId; + } + + return new WorkerRuntimeHeartbeatSnapshot( + staRuntime.LastActivityUtc, + pendingCommandCount, + outboundEventQueueDepth: 0, + lastEventSequence: 0, + currentCommandCorrelationId); + } + + public void RequestShutdown() + { + commandDispatcher?.RequestShutdown(); + } + public Task> GetRegisteredServerHandlesAsync( CancellationToken cancellationToken = default) { @@ -101,7 +125,7 @@ public sealed class MxAccessStaSession : IDisposable return; } - commandDispatcher?.RequestShutdown(); + RequestShutdown(); if (session is not null) { diff --git a/src/MxGateway.Worker/MxAccess/WorkerRuntimeHeartbeatSnapshot.cs b/src/MxGateway.Worker/MxAccess/WorkerRuntimeHeartbeatSnapshot.cs new file mode 100644 index 0000000..cb4eba5 --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/WorkerRuntimeHeartbeatSnapshot.cs @@ -0,0 +1,30 @@ +using System; + +namespace MxGateway.Worker.MxAccess; + +public sealed class WorkerRuntimeHeartbeatSnapshot +{ + public WorkerRuntimeHeartbeatSnapshot( + DateTimeOffset lastStaActivityUtc, + uint pendingCommandCount, + uint outboundEventQueueDepth, + ulong lastEventSequence, + string currentCommandCorrelationId) + { + LastStaActivityUtc = lastStaActivityUtc; + PendingCommandCount = pendingCommandCount; + OutboundEventQueueDepth = outboundEventQueueDepth; + LastEventSequence = lastEventSequence; + CurrentCommandCorrelationId = currentCommandCorrelationId ?? string.Empty; + } + + public DateTimeOffset LastStaActivityUtc { get; } + + public uint PendingCommandCount { get; } + + public uint OutboundEventQueueDepth { get; } + + public ulong LastEventSequence { get; } + + public string CurrentCommandCorrelationId { get; } +} diff --git a/src/MxGateway.Worker/WorkerApplication.cs b/src/MxGateway.Worker/WorkerApplication.cs index 3bacc28..e1be4aa 100644 --- a/src/MxGateway.Worker/WorkerApplication.cs +++ b/src/MxGateway.Worker/WorkerApplication.cs @@ -84,7 +84,7 @@ public static class WorkerApplication pipeClient.RunAsync(options).GetAwaiter().GetResult(); - logger.Information("WorkerPipeHandshakeSucceeded", new Dictionary + logger.Information("WorkerPipeSessionCompleted", new Dictionary { ["session_id"] = options.SessionId, ["pipe_name"] = options.PipeName,