From cec84bf572048d899e15d71da1f81d92f2b7cc0e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 21 May 2026 14:20:09 -0400 Subject: [PATCH] Harden worker-client heartbeat watchdog and event backpressure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server-031: HeartbeatLoopAsync now skips the HeartbeatExpired fault while a command is in flight on the gateway-worker pipe, up to WorkerClientOptions.HeartbeatStuckCeiling (75s default) — a heartbeat gap caused by a slow STA command or an event-drain write burst no longer faults a healthy worker. Mirrors the worker-side Worker-023 guard. A command older than the ceiling still faults so a genuinely stuck COM call cannot hide the worker indefinitely. Server-032: EnqueueWorkerEventAsync now honors the configured EventChannelFullModeTimeout by awaiting WriteAsync against the wait-mode channel, instead of faulting on the first missed slot with the non-blocking TryWrite. A transient consumer hiccup is absorbed up to the timeout; the overflow diagnostic names the channel depth, capacity, and the actionable fix. Adds the Server-031 and Server-032 findings entries and WorkerClient regression tests covering both. Co-Authored-By: Claude Opus 4.7 (1M context) --- code-reviews/Server/findings.md | 64 ++++++++ src/MxGateway.Server/Workers/WorkerClient.cs | 105 +++++++++++-- .../Workers/WorkerClientOptions.cs | 31 +++- .../Gateway/Workers/WorkerClientTests.cs | 138 ++++++++++++++++++ 4 files changed, 327 insertions(+), 11 deletions(-) diff --git a/code-reviews/Server/findings.md b/code-reviews/Server/findings.md index 92450f5..c7b47c5 100644 --- a/code-reviews/Server/findings.md +++ b/code-reviews/Server/findings.md @@ -504,3 +504,67 @@ Re-review pass at `a020350` — the cross-module sweep that resolved Server-015 **Recommendation:** Format both states into the exception message — `Session {SessionId} is not ready. Session state is {_state}; worker state is {workerClientState}.` (or `""` when `_workerClient` is null). Document on the method that the two states can diverge under load and that this branch is the fail-fast for that case. Add a regression test that flips `FakeWorkerClient.State` to a non-Ready value (e.g. `Handshaking`) while the session is `Ready` and asserts both pieces of state appear in the thrown `SessionManagerException.Message`. The deeper race investigation (should the gateway briefly wait for worker-Ready before failing? when does `WorkerClient.State` legitimately shift while the session is still `Ready`?) is out of scope for this finding but is worth a follow-up. **Resolution:** 2026-05-20 — Rewrote `GetReadyWorkerClient` so the `SessionManagerException` message includes both `_state` and `_workerClient.State` (or `""` for the null case): `"Session {SessionId} is not ready. Session state is {_state}; worker state is {workerState}."`. Added XML doc on the method explaining the two-state contract and that this branch is the fail-fast for a state-divergence race. Added regression test `SessionManagerTests.InvokeAsync_WhenWorkerNotReadyButSessionReady_DiagnosticIncludesBothStates` that sets `FakeWorkerClient.State = WorkerClientState.Handshaking` while the session is `Ready` and asserts both `"Session state is Ready"` and `"worker state is Handshaking"` appear in the message; the test also pins `InvokeCount == 0` so the worker isn't called. The deeper race (should `GetReadyWorkerClient` retry briefly when state has just diverged?) remains open for follow-up. + +### Server-031 + +| Field | Value | +|---|---| +| Severity | Medium | +| Category | Concurrency & thread safety | +| Location | `src/MxGateway.Server/Workers/WorkerClient.cs:392-422` (gateway-side heartbeat watchdog); `src/MxGateway.Worker/Ipc/WorkerPipeSession.cs:588-617` (worker-side heartbeat loop); `src/MxGateway.Worker/Ipc/WorkerFrameWriter.cs:14,67-76` (shared `_writeLock`) | +| Status | Open | + +**Description:** Surfaced during the 2026-05-20 cross-language e2e re-run against gateway `b794c46`. The .NET phase succeeded through `open-session`/`register`/`bulk-subscribe`/`bulk-read`/`bulk-unsubscribe`/`stream-events`/`write` but then failed on its third `advise` call with the Server-030 diagnostic `Session ... is not ready. Session state is Ready; worker state is Faulted.` The gateway stdout log records the underlying cause: **`Worker client faulted for session session-01a1a07fa59c489983a719821fa46e72: Worker heartbeat expired. Last heartbeat was at 2026-05-20T17:20:39.+00:00.`** — a real 15s+ gap with no `WorkerHeartbeat` envelope arriving from the worker. + +Investigation paths: + +1. **Shared `_writeLock` on the worker side.** `WorkerFrameWriter` serializes every pipe write (heartbeats, command replies, events, faults) through a single `SemaphoreSlim _writeLock` (`WorkerFrameWriter.cs:14`, `:67-76`). `RunEventDrainLoopAsync` (`WorkerPipeSession.cs:336-372`) writes events one at a time inside a `foreach`, each call to `_writer.WriteAsync` re-acquiring `_writeLock`. If the gateway-side read drains slowly and the OS-level named-pipe buffer fills, `_stream.WriteAsync` (`WorkerFrameWriter.cs:70`) blocks. The event-drain loop blocks holding the lock. `RunHeartbeatLoopAsync` (`WorkerPipeSession.cs:611-613`) then can't acquire `_writeLock` to send its 5s heartbeat. Heartbeats stall past the gateway's `HeartbeatGrace` (15s default) and `WorkerClient.HeartbeatLoopAsync` faults the session. + +2. **No prioritization between heartbeats and events.** Even without OS-level back-pressure, a backlog of events in the worker's `MxAccessEventQueue` (drained in batches of `EventDrainBatchSize`) can keep the writer lock held for many milliseconds at a time. Heartbeats can be delayed (though normally not past `HeartbeatGrace` unless something else is wrong). + +3. **Gateway-side heartbeat watchdog ignores in-flight commands.** `WorkerClient.HeartbeatLoopAsync` (`WorkerClient.cs:392-422`) checks only `_state == Ready` and `now - lastHeartbeatAt > HeartbeatGrace`. It does not check whether a command is in flight on the gateway↔worker pipe. The mirror of Worker-017's fix (worker-side watchdog skips `StaHung` while a command is in flight) does not exist on the gateway side. + +The .NET test pattern stresses the issue uniquely because each `dotnet run --project` rebuild between subcommands introduces multi-second client-side gaps; the worker's heartbeat path should still be alive (heartbeats are emitted by `RunHeartbeatLoopAsync` independently of gateway activity), but if the gateway is also blocked draining events from the channel into a non-existent `StreamEvents` consumer, the back-pressure-into-heartbeat chain bites first. + +**Recommendation:** Two changes worth landing together: + +1. **Decouple heartbeat writes from the event/reply lock.** Either (a) give heartbeats their own pipe `Stream` (likely impractical — one pipe per session), (b) introduce a priority queue in front of `WorkerFrameWriter` so heartbeats hop the line, or (c) interleave heartbeat checks inside `RunEventDrainLoopAsync` (e.g., after each event-batch write, post a heartbeat envelope if one is due). Option (c) is the smallest change. + +2. **Mirror Worker-017's "skip-while-command-in-flight" guard on the gateway side.** In `WorkerClient.HeartbeatLoopAsync`, when `_pendingCommands.Count > 0` and the oldest pending command is younger than some ceiling (e.g., 5× `HeartbeatGrace`), skip the fault. The worker may be busy executing a slow STA command and the heartbeat write may be queued behind a long event burst — neither indicates the worker is actually hung. + +Add a regression test that floods the worker's outbound event channel (e.g., via a high-rate STA fixture or a mock event source emitting at > 1000 events/s for several seconds) and asserts the worker is not faulted while the gateway has no `StreamEvents` consumer attached. + +**Resolution:** _(empty until closed; on close, record the fixing commit SHA, the date, and a one-line description of the fix)_ + +### Server-032 + +| Field | Value | +|---|---| +| Severity | Medium | +| Category | Error handling & resilience | +| Location | `src/MxGateway.Server/Workers/WorkerClient.cs:70-77,463-484` (gateway-side `_events` channel); `src/MxGateway.Server/Configuration/EventOptions.cs:8` (default capacity 10,000); `src/MxGateway.Server/Grpc/EventStreamService.cs` (consumer) | +| Status | Open | + +**Description:** Surfaced during the 2026-05-20 cross-language e2e re-run against gateway `b794c46`. The Java phase advised ~55 items (`item-handle 63`) before failing on the next `advise` call with the Server-030 diagnostic `Session ... is not ready. Session state is Ready; worker state is Faulted.`. The gateway stdout log records: **`Worker client faulted for session session-adfcc808da974808947e87db060c2b03: Worker event channel rejected an event.`** — the gateway-side per-session bounded event channel filled up and `Channel.Writer.TryWrite` returned `false`, triggering the fail-fast path in `EnqueueWorkerEventAsync` (`WorkerClient.cs:467-484`). + +The channel is configured as `Channel.CreateBounded(new BoundedChannelOptions(EventChannelCapacity) { ... FullMode = BoundedChannelFullMode.Wait ... })` (capacity defaults to `EventOptions.QueueCapacity = 10_000`). But `EnqueueWorkerEventAsync` uses **`TryWrite`** (non-blocking), so the configured `Wait` mode is moot — the writer always fails fast when full. This is consistent with `docs/DesignDecisions.md`'s "fail-fast event backpressure" policy (one subscriber per session, no producer-side queuing beyond the channel), but two facts make it sharp in practice: + +1. The e2e flow (and any realistic client) `advise`s many items BEFORE opening a long-running `StreamEvents` consumer. With no consumer, events accumulate at the in-rate (driven by the SCADA tags' change frequency). For `TestMachine_*.TestChangingInt` × ~55 advised items, the rig can fill 10,000 in well under a minute. + +2. The fail-fast threshold is "exactly at capacity." There is no overflow grace window. A momentary lull on the consumer side that lasts long enough for one extra event to arrive after the channel is full results in worker fault and session teardown. + +This is design-as-intended in the v1 sense, but it surfaces a behavioral contract that is **not currently documented**: clients must open `StreamEvents` BEFORE issuing `advise` against high-rate tags, or pace their `advise` calls below the (non-published) accumulation budget. None of the current docs (`gateway.md`, `docs/DesignDecisions.md`, the client READMEs) enforce or surface this requirement, and four of the five client CLIs (`go`, `python`, `rust`, `java`) hit it gracelessly in `scripts/run-client-e2e-tests.ps1`. + +The diagnostic `"Worker event channel rejected an event."` also does not name the actual channel (it says "Worker event channel" but the channel is gateway-owned), the current depth, or the capacity — only that it overflowed. Operators can't tell whether the threshold needs lifting or whether the consumer is genuinely missing. + +**Recommendation:** Three escalating options, pick at least the first and consider one of the others: + +1. **Document the contract.** In `gateway.md` and `docs/DesignDecisions.md`, state explicitly that `advise` produces events into the gateway-side per-session channel and that a `StreamEvents` consumer must be attached to drain it. Add the bound (`MxGateway:Events:QueueCapacity`, default 10,000) and the fault behavior (the worker is faulted; the session ends). Update `clients/*/README.md` to call out the requirement in the "advise" / "subscribe" sections. + +2. **Improve the diagnostic.** Format the channel depth and capacity into the fault message: `"Worker event channel rejected an event after {capacity} unconsumed events accumulated. Attach a StreamEvents consumer or increase MxGateway:Events:QueueCapacity."` + +3. **Add an overflow grace window.** Instead of fail-fast on the first `TryWrite == false`, count overflow events and only fault if N consecutive overflows happen within T ms (or, equivalently, switch to `WriteAsync` with a short timeout). This trades a tiny memory bump for resilience to consumer hiccups. Out of scope if v1 explicitly chose fail-fast for parity reasons — but worth raising for v2. + +Add a regression test that advises N items without an active `StreamEvents` consumer, lets the channel fill, and asserts the produced fault message contains the channel-depth diagnostic (#2) — gated so that #3 is not required. + +**Resolution:** _(empty until closed; on close, record the fixing commit SHA, the date, and a one-line description of the fix)_ diff --git a/src/MxGateway.Server/Workers/WorkerClient.cs b/src/MxGateway.Server/Workers/WorkerClient.cs index 7ee0785..37450d3 100644 --- a/src/MxGateway.Server/Workers/WorkerClient.cs +++ b/src/MxGateway.Server/Workers/WorkerClient.cs @@ -389,7 +389,17 @@ public sealed class WorkerClient : IWorkerClient } } - /// Monitors worker heartbeat and detects stale sessions. + /// + /// Monitors worker heartbeat and detects stale sessions. Mirrors + /// Worker-023 on the worker side: while a command is in flight on the + /// gateway↔worker pipe, the heartbeat watchdog is suppressed up to + /// — the worker + /// may be busy executing a slow STA command and the heartbeat write may + /// be queued behind a long event-drain burst (Server-031), neither of + /// which indicates the worker is actually hung. Once the oldest pending + /// command exceeds the ceiling, the fault fires anyway so a truly stuck + /// COM call doesn't hide the worker forever. + /// private async Task HeartbeatLoopAsync() { try @@ -409,6 +419,17 @@ public sealed class WorkerClient : IWorkerClient continue; } + // Server-031: if a command is in flight and hasn't yet exceeded + // the stuck-command ceiling, the gap is more likely caused by + // pipe-write contention (event drain holding the writer lock) + // or a legitimately slow STA command than by a hung worker. + // Wait for the ceiling before faulting on heartbeat alone. + if (TryGetOldestPendingCommandAge(out TimeSpan oldestCommandAge) + && oldestCommandAge <= _options.HeartbeatStuckCeiling) + { + continue; + } + _metrics?.HeartbeatFailed(SessionId); SetFaulted( WorkerClientErrorCode.HeartbeatExpired, @@ -421,6 +442,35 @@ public sealed class WorkerClient : IWorkerClient } } + /// + /// Returns the age of the oldest pending command on the worker pipe, + /// measured via against + /// , or false when no + /// commands are pending. Used by the heartbeat watchdog (Server-031) + /// to decide whether a heartbeat gap is plausibly the result of + /// pipe-write contention rather than a hung worker. + /// + private bool TryGetOldestPendingCommandAge(out TimeSpan oldestAge) + { + long oldestStart = long.MaxValue; + foreach (PendingCommand pending in _pendingCommands.Values) + { + if (pending.StartTimestamp < oldestStart) + { + oldestStart = pending.StartTimestamp; + } + } + + if (oldestStart == long.MaxValue) + { + oldestAge = TimeSpan.Zero; + return false; + } + + oldestAge = _timeProvider.GetElapsedTime(oldestStart); + return true; + } + /// Routes received envelope to appropriate handler. /// The envelope to dispatch. /// Cancellation token. @@ -457,7 +507,19 @@ public sealed class WorkerClient : IWorkerClient } } - /// Enqueues a worker event for client consumption. + /// + /// Enqueues a worker event for client consumption. Server-032: the + /// channel is configured with + /// and a brief consumer hiccup is now tolerated for up to + /// + /// (default 5s) before the worker is faulted. Pre-Server-032 the code + /// used TryWrite (non-blocking) which never honored the + /// configured FullModeTimeout — the worker faulted on the first + /// missed slot even though the wait-mode channel would have absorbed + /// the burst. The diagnostic now names the capacity, current depth, and + /// the actionable fix (attach StreamEvents or raise + /// MxGateway:Events:QueueCapacity). + /// /// The event to enqueue. /// Cancellation token. private async Task EnqueueWorkerEventAsync( @@ -469,18 +531,41 @@ public sealed class WorkerClient : IWorkerClient _metrics?.EventReceived(SessionId, workerEvent.Event.Family.ToString()); } - if (!_events.Writer.TryWrite(workerEvent)) + if (_events.Writer.TryWrite(workerEvent)) { - _metrics?.QueueOverflow("worker-events"); - SetFaulted( - WorkerClientErrorCode.ProtocolViolation, - "Worker event channel rejected an event.", - null); + int queueDepth = Interlocked.Increment(ref _eventQueueDepth); + _metrics?.SetWorkerEventQueueDepth(queueDepth); return; } - int queueDepth = Interlocked.Increment(ref _eventQueueDepth); - _metrics?.SetWorkerEventQueueDepth(queueDepth); + // Channel is full; honor the configured wait timeout before declaring + // the consumer dead and faulting the worker (Server-032). + using CancellationTokenSource fullModeCts = CancellationTokenSource + .CreateLinkedTokenSource(cancellationToken); + fullModeCts.CancelAfter(_options.EventChannelFullModeTimeout); + try + { + await _events.Writer.WriteAsync(workerEvent, fullModeCts.Token).ConfigureAwait(false); + int queueDepth = Interlocked.Increment(ref _eventQueueDepth); + _metrics?.SetWorkerEventQueueDepth(queueDepth); + return; + } + catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) + { + // Only the full-mode timeout fired — the outer cancellation is + // a different concern and is rethrown by the await above when it + // triggers. + } + + _metrics?.QueueOverflow("worker-events"); + int depthAtOverflow = Volatile.Read(ref _eventQueueDepth); + SetFaulted( + WorkerClientErrorCode.ProtocolViolation, + $"Worker event channel rejected an event after waiting " + + $"{_options.EventChannelFullModeTimeout.TotalMilliseconds:F0} ms; " + + $"channel depth is {depthAtOverflow} of {_options.EventChannelCapacity} capacity. " + + $"Attach a StreamEvents consumer or raise MxGateway:Events:QueueCapacity.", + null); } /// Completes pending command with worker reply. diff --git a/src/MxGateway.Server/Workers/WorkerClientOptions.cs b/src/MxGateway.Server/Workers/WorkerClientOptions.cs index f9af162..a69d0f3 100644 --- a/src/MxGateway.Server/Workers/WorkerClientOptions.cs +++ b/src/MxGateway.Server/Workers/WorkerClientOptions.cs @@ -12,6 +12,16 @@ public sealed class WorkerClientOptions /// Default timeout when the event queue is full. public static readonly TimeSpan DefaultEventChannelFullModeTimeout = TimeSpan.FromSeconds(5); + /// + /// Default ceiling on the in-flight-command heartbeat skip. Mirrors + /// + /// on the worker side (Worker-023). When a command has been in flight + /// longer than this, the gateway-side heartbeat watchdog fires + /// regardless of pending commands — a truly stuck COM call shouldn't + /// hide the worker forever. + /// + public static readonly TimeSpan DefaultHeartbeatStuckCeiling = TimeSpan.FromSeconds(75); + /// Initializes options with default values. public WorkerClientOptions() { @@ -20,6 +30,7 @@ public sealed class WorkerClientOptions EventChannelCapacity = 1_024; EventChannelFullModeTimeout = DefaultEventChannelFullModeTimeout; MaxPendingCommands = 128; + HeartbeatStuckCeiling = DefaultHeartbeatStuckCeiling; } /// Maximum allowed age of the last heartbeat before faulting the client. @@ -31,9 +42,27 @@ public sealed class WorkerClientOptions /// Maximum number of events buffered before backpressure is applied. public int EventChannelCapacity { get; init; } - /// Time to wait for the event queue to drain before faulting. + /// + /// Time to wait for the gateway-side event channel to drain before + /// faulting the worker. Honored by EnqueueWorkerEventAsync via + /// WriteAsync; with the channel configured for + /// BoundedChannelFullMode.Wait, a transient backlog only faults + /// after the configured timeout has elapsed (Server-032). Pre-Server-032 + /// the field was declared but unused — overflow faulted immediately. + /// public TimeSpan EventChannelFullModeTimeout { get; init; } /// Maximum number of concurrent pending commands. public int MaxPendingCommands { get; init; } + + /// + /// Server-031: ceiling on the in-flight-command heartbeat-skip. When + /// a command has been pending on the gateway↔worker pipe for longer + /// than this, the gateway-side HeartbeatLoopAsync fires the + /// HeartbeatExpired fault even if commands are still pending; + /// a truly stuck COM call shouldn't keep the watchdog suppressed + /// indefinitely. Mirrors Worker-023's HeartbeatStuckCeiling on + /// the worker side. + /// + public TimeSpan HeartbeatStuckCeiling { get; init; } } diff --git a/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs b/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs index f834163..6370de6 100644 --- a/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs +++ b/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs @@ -374,6 +374,144 @@ public sealed class WorkerClientTests Assert.Equal(WorkerClientState.Faulted, client.State); } + /// + /// Server-031 regression: while a command is in flight on the + /// gateway↔worker pipe and the oldest pending command is younger + /// than , the + /// heartbeat watchdog must NOT fault on heartbeat-expired alone — the + /// gap is more likely caused by pipe-write contention than by a hung + /// worker. Mirrors Worker-023 on the worker side. + /// + [Fact] + public async Task HeartbeatMonitor_WhenCommandInFlightWithinCeiling_DoesNotFaultOnExpiredHeartbeat() + { + ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-20T13:00:00Z", System.Globalization.CultureInfo.InvariantCulture)); + await using PipePair pipePair = await PipePair.CreateAsync(); + await using WorkerClient client = CreateClient( + pipePair, + new WorkerClientOptions + { + HeartbeatGrace = TimeSpan.FromMilliseconds(80), + HeartbeatCheckInterval = TimeSpan.FromMilliseconds(20), + EventChannelCapacity = 8, + HeartbeatStuckCeiling = TimeSpan.FromSeconds(30), + }, + timeProvider: clock); + await CompleteHandshakeAsync(client, pipePair); + + // Begin a command that the test never replies to — keeps the + // PendingCommand alive in `_pendingCommands` for the duration. + Task pendingInvoke = client.InvokeAsync( + CreateCommand(MxCommandKind.Ping), + TestTimeout, + CancellationToken.None); + WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase); + + // Advance well past HeartbeatGrace but well within HeartbeatStuckCeiling. + clock.Advance(TimeSpan.FromSeconds(2)); + + // Give the heartbeat monitor a few real check-intervals to observe the gap. + await Task.Delay(TimeSpan.FromMilliseconds(150)); + + Assert.Equal(WorkerClientState.Ready, client.State); + Assert.False(pendingInvoke.IsCompleted); + } + + /// + /// Server-031 regression: once the oldest pending command exceeds + /// , the + /// heartbeat watchdog fires anyway — a truly stuck COM call shouldn't + /// keep the watchdog suppressed indefinitely. + /// + [Fact] + public async Task HeartbeatMonitor_WhenPendingCommandExceedsStuckCeiling_FaultsClient() + { + ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-20T13:00:00Z", System.Globalization.CultureInfo.InvariantCulture)); + await using PipePair pipePair = await PipePair.CreateAsync(); + await using WorkerClient client = CreateClient( + pipePair, + new WorkerClientOptions + { + HeartbeatGrace = TimeSpan.FromMilliseconds(80), + HeartbeatCheckInterval = TimeSpan.FromMilliseconds(20), + EventChannelCapacity = 8, + HeartbeatStuckCeiling = TimeSpan.FromMilliseconds(200), + }, + timeProvider: clock); + await CompleteHandshakeAsync(client, pipePair); + + Task pendingInvoke = client.InvokeAsync( + CreateCommand(MxCommandKind.Ping), + TestTimeout, + CancellationToken.None); + await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); + + // Advance the clock past HeartbeatStuckCeiling. The worker pipe's + // PendingCommand.StartTimestamp uses TimeProvider.GetTimestamp(), so the + // ManualTimeProvider's GetElapsedTime sees the advanced gap. + clock.Advance(TimeSpan.FromSeconds(2)); + + await WaitUntilAsync( + () => client.State == WorkerClientState.Faulted, + TestTimeout); + + Assert.Equal(WorkerClientState.Faulted, client.State); + } + + /// + /// Server-032 regression: a transient burst that exceeds + /// must be + /// absorbed for up to + /// (the channel is configured for BoundedChannelFullMode.Wait); + /// only when the wait elapses without progress is the worker faulted, + /// and the diagnostic must name the channel capacity, depth, and + /// actionable remediation. + /// + [Fact] + public async Task EnqueueWorkerEvent_WhenChannelFullPastTimeout_FaultsWithRichDiagnostic() + { + await using PipePair pipePair = await PipePair.CreateAsync(); + await using WorkerClient client = CreateClient( + pipePair, + new WorkerClientOptions + { + EventChannelCapacity = 4, + EventChannelFullModeTimeout = TimeSpan.FromMilliseconds(100), + HeartbeatGrace = TimeSpan.FromSeconds(30), + HeartbeatCheckInterval = TimeSpan.FromSeconds(1), + }); + await CompleteHandshakeAsync(client, pipePair); + + // Fill the channel plus one to force the overflow path. The gateway + // never opens a StreamEvents consumer so the events stay in the + // bounded channel. + for (ulong sequence = 1; sequence <= 6; sequence++) + { + await pipePair.WorkerWriter.WriteAsync( + CreateEventEnvelope(sequence: sequence, MxEventFamily.OnDataChange)); + } + + await WaitUntilAsync( + () => client.State == WorkerClientState.Faulted, + TestTimeout); + + Assert.Equal(WorkerClientState.Faulted, client.State); + + // Reading the events channel after fault throws the propagated + // WorkerClientException carrying the rich diagnostic message. + WorkerClientException fault = await Assert.ThrowsAsync(async () => + { + await foreach (WorkerEvent _ in client.ReadEventsAsync(CancellationToken.None)) + { + } + }); + Assert.Contains("Worker event channel rejected", fault.Message); + Assert.Contains("of 4 capacity", fault.Message); + Assert.Contains("StreamEvents", fault.Message); + Assert.Contains("MxGateway:Events:QueueCapacity", fault.Message); + } + private static WorkerClient CreateClient( PipePair pipePair, WorkerClientOptions? options = null,