Implement worker heartbeat watchdog

This commit is contained in:
Joseph Doherty
2026-04-26 19:12:06 -04:00
parent a3ccd5c80b
commit 4a3560c7ee
15 changed files with 1048 additions and 20 deletions
@@ -105,6 +105,25 @@ public sealed class FakeWorkerHarnessTests
Assert.Equal(WorkerClientState.Faulted, client.State);
}
[Fact]
public async Task SendHeartbeatAsync_UpdatesClientHeartbeatState()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
await Task.Delay(TimeSpan.FromMilliseconds(20));
await fakeWorker.SendHeartbeatAsync(
configureHeartbeat: heartbeat => heartbeat.WorkerProcessId = 2468);
await WaitUntilAsync(
() => client.ProcessId == 2468 && client.LastHeartbeatAt > previousHeartbeat,
TestTimeout);
Assert.Equal(WorkerClientState.Ready, client.State);
}
[Fact]
public async Task InvokeAsync_WithHungWorker_TimesOutPendingCommand()
{
@@ -284,6 +284,26 @@ public sealed class FakeWorkerHarness : IAsyncDisposable
cancellationToken).ConfigureAwait(false);
}
public async Task SendHeartbeatAsync(
WorkerState state = WorkerState.Ready,
CancellationToken cancellationToken = default,
Action<WorkerHeartbeat>? configureHeartbeat = null)
{
WorkerHeartbeat heartbeat = new()
{
WorkerProcessId = DefaultWorkerProcessId,
State = state,
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
};
configureHeartbeat?.Invoke(heartbeat);
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerHeartbeat = heartbeat),
cancellationToken).ConfigureAwait(false);
}
public async Task SendShutdownAckAsync(
ProtocolStatusCode statusCode = ProtocolStatusCode.Ok,
CancellationToken cancellationToken = default)
@@ -1,4 +1,5 @@
using System.IO.Pipes;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
@@ -151,6 +152,24 @@ public sealed class WorkerClientTests
Assert.Equal(WorkerClientState.Faulted, client.State);
}
[Fact]
public async Task ReadLoop_WhenHeartbeatArrives_UpdatesLastHeartbeatAndWorkerProcess()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
await Task.Delay(TimeSpan.FromMilliseconds(20));
await pipePair.WorkerWriter.WriteAsync(CreateHeartbeatEnvelope(workerProcessId: 9876));
await WaitUntilAsync(
() => client.ProcessId == 9876 && client.LastHeartbeatAt > previousHeartbeat,
TestTimeout);
Assert.Equal(WorkerClientState.Ready, client.State);
}
[Fact]
public async Task HeartbeatMonitor_WhenHeartbeatExpires_FaultsClient()
{
@@ -276,6 +295,21 @@ public sealed class WorkerClientTests
});
}
private static WorkerEnvelope CreateHeartbeatEnvelope(int workerProcessId)
{
return CreateWorkerEnvelope(
correlationId: string.Empty,
sequence: 20,
envelope => envelope.WorkerHeartbeat = new WorkerHeartbeat
{
WorkerProcessId = workerProcessId,
State = WorkerState.Ready,
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
PendingCommandCount = 0,
OutboundEventQueueDepth = 0,
});
}
private static WorkerEnvelope CreateWorkerEnvelope(
string correlationId,
ulong sequence,
@@ -30,7 +30,7 @@ public sealed class WorkerApplicationTests
Assert.Equal("mxaccess-gateway-123-session-1", entry.Fields["pipe_name"]);
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, entry.Fields["protocol_version"]);
Assert.Equal("[redacted]", entry.Fields["nonce"]);
Assert.Equal("WorkerPipeHandshakeSucceeded", logger.Entries[1].EventName);
Assert.Equal("WorkerPipeSessionCompleted", logger.Entries[1].EventName);
}
[Fact]
@@ -1,10 +1,15 @@
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Bootstrap;
using MxGateway.Worker.Ipc;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Tests.Ipc;
@@ -28,7 +33,9 @@ public sealed class WorkerPipeClientTests
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
WorkerPipeClient client = new(connectTimeoutMilliseconds: 5000);
WorkerPipeClient client = new(
connectTimeoutMilliseconds: 5000,
(stream, options) => CreateSession(stream, options));
Task clientTask = client.RunAsync(workerOptions);
await Task.Factory.FromAsync(server.BeginWaitForConnection, server.EndWaitForConnection, null);
@@ -56,6 +63,86 @@ public sealed class WorkerPipeClientTests
WorkerEnvelope ready = await reader.ReadAsync();
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, ready.BodyCase);
await writer.WriteAsync(new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = "session-1",
Sequence = 2,
WorkerShutdown = new WorkerShutdown
{
GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
Reason = "test-complete",
},
});
WorkerEnvelope shutdownAck = await reader.ReadAsync();
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, shutdownAck.BodyCase);
await clientTask;
}
private static WorkerPipeSession CreateSession(
Stream stream,
WorkerFrameProtocolOptions options)
{
return new WorkerPipeSession(
new WorkerFrameReader(stream, options),
new WorkerFrameWriter(stream, options),
options,
() => 1234,
new WorkerPipeSessionOptions
{
HeartbeatInterval = TimeSpan.FromSeconds(30),
HeartbeatGrace = TimeSpan.FromSeconds(30),
},
() => new FakeRuntimeSession());
}
private sealed class FakeRuntimeSession : IWorkerRuntimeSession
{
public Task<WorkerReady> StartAsync(
int workerProcessId,
CancellationToken cancellationToken = default)
{
return Task.FromResult(new WorkerReady
{
WorkerProcessId = workerProcessId,
MxaccessProgid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.ProgId,
MxaccessClsid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
});
}
public Task<MxCommandReply> DispatchAsync(StaCommand command)
{
return Task.FromResult(new MxCommandReply
{
SessionId = command.SessionId,
CorrelationId = command.CorrelationId,
Kind = command.Kind,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
});
}
public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat()
{
return new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: string.Empty);
}
public void RequestShutdown()
{
}
public void Dispose()
{
}
}
}
@@ -1,11 +1,16 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Ipc;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Tests.Ipc;
@@ -147,6 +152,127 @@ public sealed class WorkerPipeSessionTests
Assert.Equal(ProtocolStatusCode.WorkerUnavailable, written[1].WorkerFault.ProtocolStatus.Code);
}
[Fact]
public async Task RunAsync_SendsHeartbeatPayloadFromRuntimeSnapshot()
{
using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5));
using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token);
FakeRuntimeSession runtime = new();
runtime.SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 2,
outboundEventQueueDepth: 3,
lastEventSequence: 42,
currentCommandCorrelationId: "current-command"));
WorkerPipeSession session = CreatePipeSession(
pipePair.WorkerStream,
runtime,
new WorkerPipeSessionOptions
{
HeartbeatInterval = TimeSpan.FromMilliseconds(20),
HeartbeatGrace = TimeSpan.FromSeconds(5),
});
Task runTask = session.RunAsync(cancellation.Token);
await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token);
await ThrowIfCompletedAsync(runTask);
WorkerEnvelope heartbeat = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerHeartbeat,
cancellation.Token);
Assert.Equal(WorkerState.ExecutingCommand, heartbeat.WorkerHeartbeat.State);
Assert.Equal(1234, heartbeat.WorkerHeartbeat.WorkerProcessId);
Assert.Equal(2u, heartbeat.WorkerHeartbeat.PendingCommandCount);
Assert.Equal(3u, heartbeat.WorkerHeartbeat.OutboundEventQueueDepth);
Assert.Equal(42UL, heartbeat.WorkerHeartbeat.LastEventSequence);
Assert.Equal("current-command", heartbeat.WorkerHeartbeat.CurrentCommandCorrelationId);
await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token);
}
[Fact]
public async Task RunAsync_WhenCommandIsExecuting_HeartbeatReportsCurrentCorrelation()
{
using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5));
using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token);
FakeRuntimeSession runtime = new()
{
BlockDispatch = true,
};
WorkerPipeSession session = CreatePipeSession(
pipePair.WorkerStream,
runtime,
new WorkerPipeSessionOptions
{
HeartbeatInterval = TimeSpan.FromMilliseconds(20),
HeartbeatGrace = TimeSpan.FromSeconds(5),
});
Task runTask = session.RunAsync(cancellation.Token);
await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token);
await pipePair.GatewayWriter.WriteAsync(
CreateCommandEnvelope("command-1"),
cancellation.Token);
Assert.True(runtime.DispatchStarted.Wait(TimeSpan.FromSeconds(2)));
WorkerEnvelope heartbeat = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerHeartbeat,
envelope => envelope.WorkerHeartbeat.CurrentCommandCorrelationId == "command-1",
cancellation.Token);
Assert.Equal("command-1", heartbeat.WorkerHeartbeat.CurrentCommandCorrelationId);
Assert.Equal(WorkerState.ExecutingCommand, heartbeat.WorkerHeartbeat.State);
runtime.ReleaseDispatch();
WorkerEnvelope reply = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerCommandReply,
cancellation.Token);
Assert.Equal("command-1", reply.CorrelationId);
Assert.Equal(ProtocolStatusCode.Ok, reply.WorkerCommandReply.Reply.ProtocolStatus.Code);
await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token);
}
[Fact]
public async Task RunAsync_WhenStaActivityIsStale_WritesWatchdogFault()
{
using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5));
using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token);
FakeRuntimeSession runtime = new();
runtime.SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow - TimeSpan.FromSeconds(5),
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: "stuck-command"));
WorkerPipeSession session = CreatePipeSession(
pipePair.WorkerStream,
runtime,
new WorkerPipeSessionOptions
{
HeartbeatInterval = TimeSpan.FromMilliseconds(20),
HeartbeatGrace = TimeSpan.FromMilliseconds(50),
});
Task runTask = session.RunAsync(cancellation.Token);
await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token);
WorkerEnvelope fault = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerFault,
cancellation.Token);
Assert.Equal(WorkerFaultCategory.StaHung, fault.WorkerFault.Category);
Assert.Equal("stuck-command", fault.WorkerFault.CommandMethod);
Assert.Contains("STA activity is stale", fault.WorkerFault.DiagnosticMessage);
await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token);
}
private static WorkerPipeSession CreateSession(
Stream inbound,
Stream outbound,
@@ -159,6 +285,21 @@ public sealed class WorkerPipeSessionTests
() => 1234);
}
private static WorkerPipeSession CreatePipeSession(
Stream stream,
FakeRuntimeSession runtime,
WorkerPipeSessionOptions sessionOptions)
{
WorkerFrameProtocolOptions options = CreateOptions();
return new WorkerPipeSession(
new WorkerFrameReader(stream, options),
new WorkerFrameWriter(stream, options),
options,
() => 1234,
sessionOptions,
() => runtime);
}
private static WorkerFrameProtocolOptions CreateOptions()
{
return new WorkerFrameProtocolOptions(
@@ -185,6 +326,119 @@ public sealed class WorkerPipeSessionTests
};
}
private static WorkerEnvelope CreateCommandEnvelope(string correlationId)
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = SessionId,
Sequence = 2,
CorrelationId = correlationId,
WorkerCommand = new WorkerCommand
{
Command = new MxCommand
{
Kind = MxCommandKind.Ping,
Ping = new PingCommand
{
Message = "ping",
},
},
EnqueueTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
},
};
}
private static WorkerEnvelope CreateShutdownEnvelope()
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = SessionId,
Sequence = 3,
WorkerShutdown = new WorkerShutdown
{
GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
Reason = "test-complete",
},
};
}
private static async Task CompleteGatewayHandshakeAsync(
PipePair pipePair,
CancellationToken cancellationToken)
{
await pipePair.GatewayWriter
.WriteAsync(CreateGatewayHelloEnvelope(), cancellationToken)
.ConfigureAwait(false);
WorkerEnvelope hello = await pipePair.GatewayReader.ReadAsync(cancellationToken).ConfigureAwait(false);
WorkerEnvelope ready = await pipePair.GatewayReader.ReadAsync(cancellationToken).ConfigureAwait(false);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, hello.BodyCase);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, ready.BodyCase);
}
private static async Task SendShutdownAndWaitAsync(
PipePair pipePair,
Task runTask,
CancellationToken cancellationToken)
{
await pipePair.GatewayWriter
.WriteAsync(CreateShutdownEnvelope(), cancellationToken)
.ConfigureAwait(false);
WorkerEnvelope shutdownAck = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerShutdownAck,
cancellationToken);
Assert.Equal(ProtocolStatusCode.Ok, shutdownAck.WorkerShutdownAck.Status.Code);
Task completedTask = await Task
.WhenAny(runTask, Task.Delay(TimeSpan.FromSeconds(2), cancellationToken))
.ConfigureAwait(false);
Assert.Same(runTask, completedTask);
await runTask.ConfigureAwait(false);
}
private static async Task ThrowIfCompletedAsync(Task task)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
if (task.IsCompleted)
{
await task;
}
}
private static Task<WorkerEnvelope> ReadUntilAsync(
WorkerFrameReader reader,
WorkerEnvelope.BodyOneofCase expectedBody,
CancellationToken cancellationToken)
{
return ReadUntilAsync(
reader,
expectedBody,
_ => true,
cancellationToken);
}
private static async Task<WorkerEnvelope> ReadUntilAsync(
WorkerFrameReader reader,
WorkerEnvelope.BodyOneofCase expectedBody,
Func<WorkerEnvelope, bool> predicate,
CancellationToken cancellationToken)
{
while (true)
{
WorkerEnvelope envelope = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
if (envelope.BodyCase == expectedBody && predicate(envelope))
{
return envelope;
}
}
}
private static WorkerEnvelope[] ReadWrittenFrames(
MemoryStream stream,
WorkerFrameProtocolOptions options)
@@ -219,4 +473,157 @@ public sealed class WorkerPipeSessionTests
buffer[2] = (byte)(value >> 16);
buffer[3] = (byte)(value >> 24);
}
private sealed class FakeRuntimeSession : IWorkerRuntimeSession
{
private readonly ManualResetEventSlim releaseDispatch = new(false);
private readonly object gate = new();
private WorkerRuntimeHeartbeatSnapshot snapshot = new(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: string.Empty);
public ManualResetEventSlim DispatchStarted { get; } = new(false);
public bool BlockDispatch { get; set; }
public Task<WorkerReady> StartAsync(
int workerProcessId,
CancellationToken cancellationToken = default)
{
return Task.FromResult(new WorkerReady
{
WorkerProcessId = workerProcessId,
MxaccessProgid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.ProgId,
MxaccessClsid = MxGateway.Worker.MxAccess.MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
});
}
public Task<MxCommandReply> DispatchAsync(StaCommand command)
{
return Task.Run(
() =>
{
SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
command.CorrelationId));
DispatchStarted.Set();
if (BlockDispatch)
{
releaseDispatch.Wait(TimeSpan.FromSeconds(5));
}
SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: string.Empty));
return new MxCommandReply
{
SessionId = command.SessionId,
CorrelationId = command.CorrelationId,
Kind = command.Kind,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
};
});
}
public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat()
{
lock (gate)
{
return snapshot;
}
}
public void RequestShutdown()
{
releaseDispatch.Set();
}
public void ReleaseDispatch()
{
releaseDispatch.Set();
}
public void SetSnapshot(WorkerRuntimeHeartbeatSnapshot value)
{
lock (gate)
{
snapshot = value;
}
}
public void Dispose()
{
releaseDispatch.Set();
releaseDispatch.Dispose();
DispatchStarted.Dispose();
}
}
private sealed class PipePair : IDisposable
{
private readonly NamedPipeServerStream gatewayStream;
private PipePair(
NamedPipeServerStream gatewayStream,
NamedPipeClientStream workerStream)
{
this.gatewayStream = gatewayStream;
WorkerStream = workerStream;
WorkerFrameProtocolOptions options = CreateOptions();
GatewayReader = new WorkerFrameReader(gatewayStream, options);
GatewayWriter = new WorkerFrameWriter(gatewayStream, options);
}
public Stream WorkerStream { get; }
public WorkerFrameReader GatewayReader { get; }
public WorkerFrameWriter GatewayWriter { get; }
public static async Task<PipePair> CreateAsync(CancellationToken cancellationToken)
{
string pipeName = $"mxaccessgw-worker-session-tests-{Guid.NewGuid():N}";
NamedPipeServerStream gatewayStream = new(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
NamedPipeClientStream workerStream = new(
".",
pipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous);
Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync();
await Task
.Run(() => workerStream.Connect(5000), cancellationToken)
.ConfigureAwait(false);
await waitForConnectionTask.ConfigureAwait(false);
return new PipePair(gatewayStream, workerStream);
}
public void Dispose()
{
WorkerStream.Dispose();
gatewayStream.Dispose();
}
}
}
+14 -2
View File
@@ -1,4 +1,5 @@
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
@@ -11,6 +12,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
public const int DefaultConnectTimeoutMilliseconds = 30000;
private readonly int _connectTimeoutMilliseconds;
private readonly Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> _sessionFactory;
public WorkerPipeClient()
: this(DefaultConnectTimeoutMilliseconds)
@@ -18,6 +20,15 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
}
public WorkerPipeClient(int connectTimeoutMilliseconds)
: this(
connectTimeoutMilliseconds,
(stream, frameOptions) => new WorkerPipeSession(stream, frameOptions))
{
}
public WorkerPipeClient(
int connectTimeoutMilliseconds,
Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> sessionFactory)
{
if (connectTimeoutMilliseconds <= 0)
{
@@ -26,6 +37,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
"Worker pipe connect timeout must be greater than zero.");
}
_sessionFactory = sessionFactory ?? throw new ArgumentNullException(nameof(sessionFactory));
_connectTimeoutMilliseconds = connectTimeoutMilliseconds;
}
@@ -48,8 +60,8 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false);
WorkerPipeSession session = new(pipe, frameOptions);
await session.CompleteStartupHandshakeAsync(cancellationToken).ConfigureAwait(false);
WorkerPipeSession session = _sessionFactory(pipe, frameOptions);
await session.RunAsync(cancellationToken).ConfigureAwait(false);
}
private Task ConnectAsync(
+329 -5
View File
@@ -6,6 +6,7 @@ using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Ipc;
@@ -13,10 +14,14 @@ public sealed class WorkerPipeSession
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Func<int> _processIdProvider;
private readonly Func<IWorkerRuntimeSession> _runtimeSessionFactory;
private readonly WorkerPipeSessionOptions _sessionOptions;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private MxAccessStaSession? _mxAccessStaSession;
private IWorkerRuntimeSession? _runtimeSession;
private long _nextSequence;
private WorkerState _state = WorkerState.Starting;
private bool _watchdogFaultSent;
public WorkerPipeSession(
Stream stream,
@@ -34,11 +39,49 @@ public sealed class WorkerPipeSession
WorkerFrameWriter writer,
WorkerFrameProtocolOptions options,
Func<int> processIdProvider)
: this(
reader,
writer,
options,
processIdProvider,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession())
{
}
public WorkerPipeSession(
WorkerFrameReader reader,
WorkerFrameWriter writer,
WorkerFrameProtocolOptions options,
Func<int> processIdProvider,
WorkerPipeSessionOptions sessionOptions,
Func<IWorkerRuntimeSession> runtimeSessionFactory)
{
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
_options = options ?? throw new ArgumentNullException(nameof(options));
_processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider));
_sessionOptions = sessionOptions ?? throw new ArgumentNullException(nameof(sessionOptions));
_runtimeSessionFactory = runtimeSessionFactory ?? throw new ArgumentNullException(nameof(runtimeSessionFactory));
_sessionOptions.Validate();
}
public async Task RunAsync(CancellationToken cancellationToken = default)
{
_runtimeSession = _runtimeSessionFactory();
try
{
await CompleteStartupHandshakeAsync(
token => _runtimeSession.StartAsync(_processIdProvider(), token),
cancellationToken).ConfigureAwait(false);
await RunMessageLoopAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
_runtimeSession?.Dispose();
_runtimeSession = null;
_state = WorkerState.Stopped;
}
}
public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default)
@@ -76,11 +119,14 @@ public sealed class WorkerPipeSession
try
{
WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
_state = WorkerState.Handshaking;
ValidateGatewayHello(envelope);
await WriteWorkerHelloAsync(cancellationToken).ConfigureAwait(false);
_state = WorkerState.InitializingSta;
WorkerReady ready = await initializeMxAccessAsync(cancellationToken).ConfigureAwait(false);
await WriteWorkerReadyAsync(ready, cancellationToken).ConfigureAwait(false);
_state = WorkerState.Ready;
}
catch (WorkerFrameProtocolException exception)
{
@@ -140,6 +186,174 @@ public sealed class WorkerPipeSession
return _writer.WriteAsync(CreateEnvelope(ready), cancellationToken);
}
private async Task RunMessageLoopAsync(CancellationToken cancellationToken)
{
using CancellationTokenSource heartbeatCancellation = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
Task heartbeatTask = RunHeartbeatLoopAsync(heartbeatCancellation.Token);
try
{
while (!cancellationToken.IsCancellationRequested)
{
Task<WorkerEnvelope> readTask = _reader.ReadAsync(cancellationToken);
Task completedTask = await Task.WhenAny(readTask, heartbeatTask).ConfigureAwait(false);
if (completedTask == heartbeatTask)
{
await heartbeatTask.ConfigureAwait(false);
}
WorkerEnvelope envelope = await readTask.ConfigureAwait(false);
bool keepReading = await DispatchGatewayEnvelopeAsync(envelope, cancellationToken).ConfigureAwait(false);
if (!keepReading)
{
return;
}
}
}
finally
{
heartbeatCancellation.Cancel();
try
{
await heartbeatTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
}
}
private async Task<bool> DispatchGatewayEnvelopeAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
switch (envelope.BodyCase)
{
case WorkerEnvelope.BodyOneofCase.WorkerCommand:
_ = ProcessCommandAsync(envelope, cancellationToken);
return true;
case WorkerEnvelope.BodyOneofCase.WorkerShutdown:
await ShutdownAsync(envelope.WorkerShutdown, cancellationToken).ConfigureAwait(false);
return false;
case WorkerEnvelope.BodyOneofCase.WorkerCancel:
return true;
default:
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.UnexpectedEnvelopeBody,
$"Worker received unexpected gateway envelope body {envelope.BodyCase}.");
}
}
private async Task ProcessCommandAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
IWorkerRuntimeSession runtimeSession = _runtimeSession
?? throw new InvalidOperationException("Worker runtime session has not been initialized.");
WorkerCommand workerCommand = envelope.WorkerCommand;
MxCommand command = workerCommand.Command;
StaCommand staCommand = new(
_options.SessionId,
envelope.CorrelationId,
command,
workerCommand.EnqueueTimestamp,
cancellationToken);
try
{
MxCommandReply reply = await runtimeSession.DispatchAsync(staCommand).ConfigureAwait(false);
await _writer
.WriteAsync(
CreateEnvelope(new WorkerCommandReply
{
Reply = reply,
CompletedTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
}),
cancellationToken)
.ConfigureAwait(false);
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
_state = WorkerState.Faulted;
await TryWriteFaultAsync(
CreateFault(
WorkerFaultCategory.MxaccessCommandFailed,
staCommand.MethodName,
exception),
cancellationToken).ConfigureAwait(false);
}
}
private async Task ShutdownAsync(
WorkerShutdown shutdown,
CancellationToken cancellationToken)
{
_state = WorkerState.ShuttingDown;
_runtimeSession?.RequestShutdown();
await _writer
.WriteAsync(
CreateEnvelope(
new WorkerShutdownAck
{
Status = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = string.IsNullOrWhiteSpace(shutdown.Reason)
? "Worker shutdown accepted."
: $"Worker shutdown accepted: {shutdown.Reason}",
},
}),
cancellationToken)
.ConfigureAwait(false);
}
private async Task RunHeartbeatLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(_sessionOptions.HeartbeatInterval, cancellationToken).ConfigureAwait(false);
IWorkerRuntimeSession? runtimeSession = _runtimeSession;
if (runtimeSession is null)
{
continue;
}
WorkerRuntimeHeartbeatSnapshot snapshot = runtimeSession.CaptureHeartbeat();
await _writer
.WriteAsync(CreateEnvelope(CreateHeartbeat(snapshot)), cancellationToken)
.ConfigureAwait(false);
await ReportWatchdogFaultIfNeededAsync(snapshot, cancellationToken).ConfigureAwait(false);
}
}
private async Task ReportWatchdogFaultIfNeededAsync(
WorkerRuntimeHeartbeatSnapshot snapshot,
CancellationToken cancellationToken)
{
TimeSpan staleFor = DateTimeOffset.UtcNow - snapshot.LastStaActivityUtc;
if (staleFor <= _sessionOptions.HeartbeatGrace)
{
_watchdogFaultSent = false;
return;
}
if (_watchdogFaultSent)
{
return;
}
_watchdogFaultSent = true;
await TryWriteFaultAsync(
CreateFault(
WorkerFaultCategory.StaHung,
snapshot.CurrentCommandCorrelationId,
$"STA activity is stale by {staleFor}."),
cancellationToken).ConfigureAwait(false);
}
private async Task TryWriteFaultAsync(
WorkerFrameProtocolException exception,
CancellationToken cancellationToken)
@@ -178,6 +392,25 @@ public sealed class WorkerPipeSession
}
}
private async Task TryWriteFaultAsync(
WorkerFault fault,
CancellationToken cancellationToken)
{
try
{
await _writer
.WriteAsync(CreateEnvelope(fault), cancellationToken)
.ConfigureAwait(false);
}
catch (Exception faultWriteException) when (
faultWriteException is IOException
|| faultWriteException is ObjectDisposedException
|| faultWriteException is WorkerFrameProtocolException)
{
// The runtime fault remains observable through worker exit or pipe closure.
}
}
private WorkerEnvelope CreateEnvelope(WorkerHello hello)
{
return CreateBaseEnvelope(hello);
@@ -193,6 +426,21 @@ public sealed class WorkerPipeSession
return CreateBaseEnvelope(fault);
}
private WorkerEnvelope CreateEnvelope(WorkerCommandReply reply)
{
return CreateBaseEnvelope(reply);
}
private WorkerEnvelope CreateEnvelope(WorkerShutdownAck shutdownAck)
{
return CreateBaseEnvelope(shutdownAck);
}
private WorkerEnvelope CreateEnvelope(WorkerHeartbeat heartbeat)
{
return CreateBaseEnvelope(heartbeat);
}
private WorkerEnvelope CreateBaseEnvelope(WorkerHello body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
@@ -214,6 +462,28 @@ public sealed class WorkerPipeSession
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerCommandReply body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.CorrelationId = body.Reply?.CorrelationId ?? string.Empty;
envelope.WorkerCommandReply = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerShutdownAck body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerShutdownAck = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerHeartbeat body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerHeartbeat = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope()
{
return new WorkerEnvelope
@@ -231,21 +501,39 @@ public sealed class WorkerPipeSession
private async Task<WorkerReady> InitializeMxAccessAsync(CancellationToken cancellationToken)
{
_mxAccessStaSession = new MxAccessStaSession();
_runtimeSession = new MxAccessStaSession();
try
{
return await _mxAccessStaSession
return await _runtimeSession
.StartAsync(_processIdProvider(), cancellationToken)
.ConfigureAwait(false);
}
catch
{
_mxAccessStaSession.Dispose();
_mxAccessStaSession = null;
_runtimeSession.Dispose();
_runtimeSession = null;
throw;
}
}
private WorkerHeartbeat CreateHeartbeat(WorkerRuntimeHeartbeatSnapshot snapshot)
{
WorkerState state = string.IsNullOrWhiteSpace(snapshot.CurrentCommandCorrelationId)
? _state
: WorkerState.ExecutingCommand;
return new WorkerHeartbeat
{
WorkerProcessId = _processIdProvider(),
State = state,
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(snapshot.LastStaActivityUtc),
PendingCommandCount = snapshot.PendingCommandCount,
OutboundEventQueueDepth = snapshot.OutboundEventQueueDepth,
LastEventSequence = snapshot.LastEventSequence,
CurrentCommandCorrelationId = snapshot.CurrentCommandCorrelationId,
};
}
private WorkerReady CreateWorkerReady()
{
return new WorkerReady
@@ -295,6 +583,42 @@ public sealed class WorkerPipeSession
return fault;
}
private static WorkerFault CreateFault(
WorkerFaultCategory category,
string commandMethod,
Exception exception)
{
WorkerFault fault = CreateFault(
category,
commandMethod,
exception.Message);
fault.ExceptionType = exception.GetType().FullName ?? string.Empty;
fault.ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = exception.Message,
};
return fault;
}
private static WorkerFault CreateFault(
WorkerFaultCategory category,
string commandMethod,
string diagnosticMessage)
{
return new WorkerFault
{
Category = category,
CommandMethod = commandMethod ?? string.Empty,
DiagnosticMessage = diagnosticMessage,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = diagnosticMessage,
},
};
}
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
{
return errorCode switch
@@ -0,0 +1,36 @@
using System;
namespace MxGateway.Worker.Ipc;
public sealed class WorkerPipeSessionOptions
{
public static readonly TimeSpan DefaultHeartbeatInterval = TimeSpan.FromSeconds(5);
public static readonly TimeSpan DefaultHeartbeatGrace = TimeSpan.FromSeconds(15);
public WorkerPipeSessionOptions()
{
HeartbeatInterval = DefaultHeartbeatInterval;
HeartbeatGrace = DefaultHeartbeatGrace;
}
public TimeSpan HeartbeatInterval { get; set; }
public TimeSpan HeartbeatGrace { get; set; }
public void Validate()
{
if (HeartbeatInterval <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(HeartbeatInterval),
"Worker heartbeat interval must be greater than zero.");
}
if (HeartbeatGrace <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(HeartbeatGrace),
"Worker heartbeat grace must be greater than zero.");
}
}
}
@@ -0,0 +1,20 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.MxAccess;
public interface IWorkerRuntimeSession : IDisposable
{
Task<WorkerReady> StartAsync(
int workerProcessId,
CancellationToken cancellationToken = default);
Task<MxCommandReply> DispatchAsync(StaCommand command);
WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat();
void RequestShutdown();
}
@@ -7,7 +7,7 @@ using MxGateway.Worker.Sta;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessStaSession : IDisposable
public sealed class MxAccessStaSession : IWorkerRuntimeSession
{
private readonly IMxAccessComObjectFactory factory;
private readonly IMxAccessEventSink eventSink;
@@ -68,6 +68,30 @@ public sealed class MxAccessStaSession : IDisposable
return commandDispatcher.DispatchAsync(command);
}
public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat()
{
uint pendingCommandCount = 0;
string currentCommandCorrelationId = string.Empty;
if (commandDispatcher is not null)
{
pendingCommandCount = (uint)commandDispatcher.PendingCommandCount;
currentCommandCorrelationId = commandDispatcher.CurrentCommandCorrelationId;
}
return new WorkerRuntimeHeartbeatSnapshot(
staRuntime.LastActivityUtc,
pendingCommandCount,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId);
}
public void RequestShutdown()
{
commandDispatcher?.RequestShutdown();
}
public Task<IReadOnlyList<RegisteredServerHandle>> GetRegisteredServerHandlesAsync(
CancellationToken cancellationToken = default)
{
@@ -101,7 +125,7 @@ public sealed class MxAccessStaSession : IDisposable
return;
}
commandDispatcher?.RequestShutdown();
RequestShutdown();
if (session is not null)
{
@@ -0,0 +1,30 @@
using System;
namespace MxGateway.Worker.MxAccess;
public sealed class WorkerRuntimeHeartbeatSnapshot
{
public WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset lastStaActivityUtc,
uint pendingCommandCount,
uint outboundEventQueueDepth,
ulong lastEventSequence,
string currentCommandCorrelationId)
{
LastStaActivityUtc = lastStaActivityUtc;
PendingCommandCount = pendingCommandCount;
OutboundEventQueueDepth = outboundEventQueueDepth;
LastEventSequence = lastEventSequence;
CurrentCommandCorrelationId = currentCommandCorrelationId ?? string.Empty;
}
public DateTimeOffset LastStaActivityUtc { get; }
public uint PendingCommandCount { get; }
public uint OutboundEventQueueDepth { get; }
public ulong LastEventSequence { get; }
public string CurrentCommandCorrelationId { get; }
}
+1 -1
View File
@@ -84,7 +84,7 @@ public static class WorkerApplication
pipeClient.RunAsync(options).GetAwaiter().GetResult();
logger.Information("WorkerPipeHandshakeSucceeded", new Dictionary<string, object?>
logger.Information("WorkerPipeSessionCompleted", new Dictionary<string, object?>
{
["session_id"] = options.SessionId,
["pipe_name"] = options.PipeName,