Harden worker-client heartbeat watchdog and event backpressure

Server-031: HeartbeatLoopAsync now skips the HeartbeatExpired fault
while a command is in flight on the gateway-worker pipe, up to
WorkerClientOptions.HeartbeatStuckCeiling (75s default) — a heartbeat
gap caused by a slow STA command or an event-drain write burst no
longer faults a healthy worker. Mirrors the worker-side Worker-023
guard. A command older than the ceiling still faults so a genuinely
stuck COM call cannot hide the worker indefinitely.

Server-032: EnqueueWorkerEventAsync now honors the configured
EventChannelFullModeTimeout by awaiting WriteAsync against the
wait-mode channel, instead of faulting on the first missed slot with
the non-blocking TryWrite. A transient consumer hiccup is absorbed up
to the timeout; the overflow diagnostic names the channel depth,
capacity, and the actionable fix.

Adds the Server-031 and Server-032 findings entries and WorkerClient
regression tests covering both.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-21 14:20:09 -04:00
parent 099d4783b0
commit cec84bf572
4 changed files with 327 additions and 11 deletions
+95 -10
View File
@@ -389,7 +389,17 @@ public sealed class WorkerClient : IWorkerClient
}
}
/// <summary>Monitors worker heartbeat and detects stale sessions.</summary>
/// <summary>
/// Monitors worker heartbeat and detects stale sessions. Mirrors
/// Worker-023 on the worker side: while a command is in flight on the
/// gateway↔worker pipe, the heartbeat watchdog is suppressed up to
/// <see cref="WorkerClientOptions.HeartbeatStuckCeiling"/> — the worker
/// may be busy executing a slow STA command and the heartbeat write may
/// be queued behind a long event-drain burst (Server-031), neither of
/// which indicates the worker is actually hung. Once the oldest pending
/// command exceeds the ceiling, the fault fires anyway so a truly stuck
/// COM call doesn't hide the worker forever.
/// </summary>
private async Task HeartbeatLoopAsync()
{
try
@@ -409,6 +419,17 @@ public sealed class WorkerClient : IWorkerClient
continue;
}
// Server-031: if a command is in flight and hasn't yet exceeded
// the stuck-command ceiling, the gap is more likely caused by
// pipe-write contention (event drain holding the writer lock)
// or a legitimately slow STA command than by a hung worker.
// Wait for the ceiling before faulting on heartbeat alone.
if (TryGetOldestPendingCommandAge(out TimeSpan oldestCommandAge)
&& oldestCommandAge <= _options.HeartbeatStuckCeiling)
{
continue;
}
_metrics?.HeartbeatFailed(SessionId);
SetFaulted(
WorkerClientErrorCode.HeartbeatExpired,
@@ -421,6 +442,35 @@ public sealed class WorkerClient : IWorkerClient
}
}
/// <summary>
/// Returns the age of the oldest pending command on the worker pipe,
/// measured via <see cref="TimeProvider.GetElapsedTime(long)"/> against
/// <see cref="PendingCommand.StartTimestamp"/>, or <c>false</c> when no
/// commands are pending. Used by the heartbeat watchdog (Server-031)
/// to decide whether a heartbeat gap is plausibly the result of
/// pipe-write contention rather than a hung worker.
/// </summary>
private bool TryGetOldestPendingCommandAge(out TimeSpan oldestAge)
{
long oldestStart = long.MaxValue;
foreach (PendingCommand pending in _pendingCommands.Values)
{
if (pending.StartTimestamp < oldestStart)
{
oldestStart = pending.StartTimestamp;
}
}
if (oldestStart == long.MaxValue)
{
oldestAge = TimeSpan.Zero;
return false;
}
oldestAge = _timeProvider.GetElapsedTime(oldestStart);
return true;
}
/// <summary>Routes received envelope to appropriate handler.</summary>
/// <param name="envelope">The envelope to dispatch.</param>
/// <param name="cancellationToken">Cancellation token.</param>
@@ -457,7 +507,19 @@ public sealed class WorkerClient : IWorkerClient
}
}
/// <summary>Enqueues a worker event for client consumption.</summary>
/// <summary>
/// Enqueues a worker event for client consumption. Server-032: the
/// channel is configured with <see cref="BoundedChannelFullMode.Wait"/>
/// and a brief consumer hiccup is now tolerated for up to
/// <see cref="WorkerClientOptions.EventChannelFullModeTimeout"/>
/// (default 5s) before the worker is faulted. Pre-Server-032 the code
/// used <c>TryWrite</c> (non-blocking) which never honored the
/// configured <c>FullModeTimeout</c> — the worker faulted on the first
/// missed slot even though the wait-mode channel would have absorbed
/// the burst. The diagnostic now names the capacity, current depth, and
/// the actionable fix (attach <c>StreamEvents</c> or raise
/// <c>MxGateway:Events:QueueCapacity</c>).
/// </summary>
/// <param name="workerEvent">The event to enqueue.</param>
/// <param name="cancellationToken">Cancellation token.</param>
private async Task EnqueueWorkerEventAsync(
@@ -469,18 +531,41 @@ public sealed class WorkerClient : IWorkerClient
_metrics?.EventReceived(SessionId, workerEvent.Event.Family.ToString());
}
if (!_events.Writer.TryWrite(workerEvent))
if (_events.Writer.TryWrite(workerEvent))
{
_metrics?.QueueOverflow("worker-events");
SetFaulted(
WorkerClientErrorCode.ProtocolViolation,
"Worker event channel rejected an event.",
null);
int queueDepth = Interlocked.Increment(ref _eventQueueDepth);
_metrics?.SetWorkerEventQueueDepth(queueDepth);
return;
}
int queueDepth = Interlocked.Increment(ref _eventQueueDepth);
_metrics?.SetWorkerEventQueueDepth(queueDepth);
// Channel is full; honor the configured wait timeout before declaring
// the consumer dead and faulting the worker (Server-032).
using CancellationTokenSource fullModeCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
fullModeCts.CancelAfter(_options.EventChannelFullModeTimeout);
try
{
await _events.Writer.WriteAsync(workerEvent, fullModeCts.Token).ConfigureAwait(false);
int queueDepth = Interlocked.Increment(ref _eventQueueDepth);
_metrics?.SetWorkerEventQueueDepth(queueDepth);
return;
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
// Only the full-mode timeout fired — the outer cancellation is
// a different concern and is rethrown by the await above when it
// triggers.
}
_metrics?.QueueOverflow("worker-events");
int depthAtOverflow = Volatile.Read(ref _eventQueueDepth);
SetFaulted(
WorkerClientErrorCode.ProtocolViolation,
$"Worker event channel rejected an event after waiting "
+ $"{_options.EventChannelFullModeTimeout.TotalMilliseconds:F0} ms; "
+ $"channel depth is {depthAtOverflow} of {_options.EventChannelCapacity} capacity. "
+ $"Attach a StreamEvents consumer or raise MxGateway:Events:QueueCapacity.",
null);
}
/// <summary>Completes pending command with worker reply.</summary>
@@ -12,6 +12,16 @@ public sealed class WorkerClientOptions
/// <summary>Default timeout when the event queue is full.</summary>
public static readonly TimeSpan DefaultEventChannelFullModeTimeout = TimeSpan.FromSeconds(5);
/// <summary>
/// Default ceiling on the in-flight-command heartbeat skip. Mirrors
/// <see cref="MxGateway.Worker.Ipc.WorkerPipeSessionOptions.DefaultHeartbeatStuckCeiling"/>
/// on the worker side (Worker-023). When a command has been in flight
/// longer than this, the gateway-side heartbeat watchdog fires
/// regardless of pending commands — a truly stuck COM call shouldn't
/// hide the worker forever.
/// </summary>
public static readonly TimeSpan DefaultHeartbeatStuckCeiling = TimeSpan.FromSeconds(75);
/// <summary>Initializes options with default values.</summary>
public WorkerClientOptions()
{
@@ -20,6 +30,7 @@ public sealed class WorkerClientOptions
EventChannelCapacity = 1_024;
EventChannelFullModeTimeout = DefaultEventChannelFullModeTimeout;
MaxPendingCommands = 128;
HeartbeatStuckCeiling = DefaultHeartbeatStuckCeiling;
}
/// <summary>Maximum allowed age of the last heartbeat before faulting the client.</summary>
@@ -31,9 +42,27 @@ public sealed class WorkerClientOptions
/// <summary>Maximum number of events buffered before backpressure is applied.</summary>
public int EventChannelCapacity { get; init; }
/// <summary>Time to wait for the event queue to drain before faulting.</summary>
/// <summary>
/// Time to wait for the gateway-side event channel to drain before
/// faulting the worker. Honored by <c>EnqueueWorkerEventAsync</c> via
/// <c>WriteAsync</c>; with the channel configured for
/// <c>BoundedChannelFullMode.Wait</c>, a transient backlog only faults
/// after the configured timeout has elapsed (Server-032). Pre-Server-032
/// the field was declared but unused — overflow faulted immediately.
/// </summary>
public TimeSpan EventChannelFullModeTimeout { get; init; }
/// <summary>Maximum number of concurrent pending commands.</summary>
public int MaxPendingCommands { get; init; }
/// <summary>
/// Server-031: ceiling on the in-flight-command heartbeat-skip. When
/// a command has been pending on the gateway↔worker pipe for longer
/// than this, the gateway-side <c>HeartbeatLoopAsync</c> fires the
/// <c>HeartbeatExpired</c> fault even if commands are still pending;
/// a truly stuck COM call shouldn't keep the watchdog suppressed
/// indefinitely. Mirrors Worker-023's <c>HeartbeatStuckCeiling</c> on
/// the worker side.
/// </summary>
public TimeSpan HeartbeatStuckCeiling { get; init; }
}
@@ -374,6 +374,144 @@ public sealed class WorkerClientTests
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>
/// Server-031 regression: while a command is in flight on the
/// gateway↔worker pipe and the oldest pending command is younger
/// than <see cref="WorkerClientOptions.HeartbeatStuckCeiling"/>, the
/// heartbeat watchdog must NOT fault on heartbeat-expired alone — the
/// gap is more likely caused by pipe-write contention than by a hung
/// worker. Mirrors Worker-023 on the worker side.
/// </summary>
[Fact]
public async Task HeartbeatMonitor_WhenCommandInFlightWithinCeiling_DoesNotFaultOnExpiredHeartbeat()
{
ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-20T13:00:00Z", System.Globalization.CultureInfo.InvariantCulture));
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
HeartbeatGrace = TimeSpan.FromMilliseconds(80),
HeartbeatCheckInterval = TimeSpan.FromMilliseconds(20),
EventChannelCapacity = 8,
HeartbeatStuckCeiling = TimeSpan.FromSeconds(30),
},
timeProvider: clock);
await CompleteHandshakeAsync(client, pipePair);
// Begin a command that the test never replies to — keeps the
// PendingCommand alive in `_pendingCommands` for the duration.
Task<WorkerCommandReply> pendingInvoke = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
// Advance well past HeartbeatGrace but well within HeartbeatStuckCeiling.
clock.Advance(TimeSpan.FromSeconds(2));
// Give the heartbeat monitor a few real check-intervals to observe the gap.
await Task.Delay(TimeSpan.FromMilliseconds(150));
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.False(pendingInvoke.IsCompleted);
}
/// <summary>
/// Server-031 regression: once the oldest pending command exceeds
/// <see cref="WorkerClientOptions.HeartbeatStuckCeiling"/>, the
/// heartbeat watchdog fires anyway — a truly stuck COM call shouldn't
/// keep the watchdog suppressed indefinitely.
/// </summary>
[Fact]
public async Task HeartbeatMonitor_WhenPendingCommandExceedsStuckCeiling_FaultsClient()
{
ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-20T13:00:00Z", System.Globalization.CultureInfo.InvariantCulture));
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
HeartbeatGrace = TimeSpan.FromMilliseconds(80),
HeartbeatCheckInterval = TimeSpan.FromMilliseconds(20),
EventChannelCapacity = 8,
HeartbeatStuckCeiling = TimeSpan.FromMilliseconds(200),
},
timeProvider: clock);
await CompleteHandshakeAsync(client, pipePair);
Task<WorkerCommandReply> pendingInvoke = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
// Advance the clock past HeartbeatStuckCeiling. The worker pipe's
// PendingCommand.StartTimestamp uses TimeProvider.GetTimestamp(), so the
// ManualTimeProvider's GetElapsedTime sees the advanced gap.
clock.Advance(TimeSpan.FromSeconds(2));
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>
/// Server-032 regression: a transient burst that exceeds
/// <see cref="WorkerClientOptions.EventChannelCapacity"/> must be
/// absorbed for up to <see cref="WorkerClientOptions.EventChannelFullModeTimeout"/>
/// (the channel is configured for <c>BoundedChannelFullMode.Wait</c>);
/// only when the wait elapses without progress is the worker faulted,
/// and the diagnostic must name the channel capacity, depth, and
/// actionable remediation.
/// </summary>
[Fact]
public async Task EnqueueWorkerEvent_WhenChannelFullPastTimeout_FaultsWithRichDiagnostic()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
EventChannelCapacity = 4,
EventChannelFullModeTimeout = TimeSpan.FromMilliseconds(100),
HeartbeatGrace = TimeSpan.FromSeconds(30),
HeartbeatCheckInterval = TimeSpan.FromSeconds(1),
});
await CompleteHandshakeAsync(client, pipePair);
// Fill the channel plus one to force the overflow path. The gateway
// never opens a StreamEvents consumer so the events stay in the
// bounded channel.
for (ulong sequence = 1; sequence <= 6; sequence++)
{
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: sequence, MxEventFamily.OnDataChange));
}
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
// Reading the events channel after fault throws the propagated
// WorkerClientException carrying the rich diagnostic message.
WorkerClientException fault = await Assert.ThrowsAsync<WorkerClientException>(async () =>
{
await foreach (WorkerEvent _ in client.ReadEventsAsync(CancellationToken.None))
{
}
});
Assert.Contains("Worker event channel rejected", fault.Message);
Assert.Contains("of 4 capacity", fault.Message);
Assert.Contains("StreamEvents", fault.Message);
Assert.Contains("MxGateway:Events:QueueCapacity", fault.Message);
}
private static WorkerClient CreateClient(
PipePair pipePair,
WorkerClientOptions? options = null,