From 4966ef33594385e995c1b7cc44c34e751bc6e60e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 16 Jun 2026 16:48:02 -0400 Subject: [PATCH] feat(server): bounded worker-ready wait in GatewaySession (default off) --- .../Sessions/GatewaySession.cs | 171 ++++++++++++++++-- .../Sessions/SessionManager.cs | 3 +- .../Gateway/Sessions/SessionManagerTests.cs | 123 ++++++++++++- 3 files changed, 283 insertions(+), 14 deletions(-) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs index be4d36f..70539f9 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs @@ -23,6 +23,7 @@ public sealed class GatewaySession private bool _closeStarted; private int _activeEventSubscriberCount; private readonly TimeSpan _detachGrace; + private readonly TimeSpan _workerReadyWaitTimeout; private DateTimeOffset? _detachedAtUtc; private SessionEventDistributor? _eventDistributor; private bool _eventDistributorStarted; @@ -115,6 +116,17 @@ public sealed class GatewaySession /// The clock comes from 's /// so the timer is unit-testable. /// + /// + /// Bounded time the session will wait, on the command/event hot path, for the worker + /// client to reach when the session is already + /// but the worker state has transiently diverged + /// (e.g. after a heartbeat blip). The wait + /// applies only to transient worker states; terminal states + /// (// + /// /no worker) and a non-Ready session fail + /// fast immediately. (the default) disables the wait and + /// preserves the original fail-fast behavior byte-for-byte. + /// public GatewaySession( string sessionId, string backendName, @@ -130,7 +142,8 @@ public sealed class GatewaySession TimeSpan leaseDuration, DateTimeOffset openedAt, SessionEventStreaming? eventStreaming = null, - TimeSpan detachGrace = default) + TimeSpan detachGrace = default, + TimeSpan workerReadyWaitTimeout = default) { if (string.IsNullOrWhiteSpace(sessionId)) { @@ -169,6 +182,7 @@ public sealed class GatewaySession _leaseExpiresAt = openedAt + leaseDuration; _eventStreaming = eventStreaming ?? SessionEventStreaming.Default; _detachGrace = detachGrace > TimeSpan.Zero ? detachGrace : TimeSpan.Zero; + _workerReadyWaitTimeout = workerReadyWaitTimeout > TimeSpan.Zero ? workerReadyWaitTimeout : TimeSpan.Zero; } /// @@ -915,7 +929,7 @@ public sealed class GatewaySession WorkerCommand command, CancellationToken cancellationToken) { - IWorkerClient workerClient = GetReadyWorkerClient(); + IWorkerClient workerClient = await GetReadyWorkerClientAsync(cancellationToken).ConfigureAwait(false); TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow()); return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false); @@ -1258,12 +1272,20 @@ public sealed class GatewaySession /// Reads events from the worker as an asynchronous enumerable stream. /// /// Token to cancel the asynchronous operation. - public IAsyncEnumerable ReadEventsAsync(CancellationToken cancellationToken) + /// An asynchronous stream of worker events. + public async IAsyncEnumerable ReadEventsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) { - IWorkerClient workerClient = GetReadyWorkerClient(); + IWorkerClient workerClient = await GetReadyWorkerClientAsync(cancellationToken).ConfigureAwait(false); TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow()); - return workerClient.ReadEventsAsync(cancellationToken); + await foreach (WorkerEvent workerEvent in workerClient + .ReadEventsAsync(cancellationToken) + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + yield return workerEvent; + } } /// @@ -1666,18 +1688,143 @@ public sealed class GatewaySession { lock (_syncRoot) { - if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready) + IWorkerClient? ready = EvaluateReadyUnderLock(out string? failureMessage); + if (ready is not null) { - string workerState = _workerClient is null - ? "" - : _workerClient.State.ToString(); - throw new SessionManagerException( - SessionManagerErrorCode.SessionNotReady, - $"Session {SessionId} is not ready. Session state is {_state}; worker state is {workerState}."); + return ready; } + throw new SessionManagerException(SessionManagerErrorCode.SessionNotReady, failureMessage!); + } + } + + /// + /// Bounded, opt-in async variant of . When the + /// session is but the worker has transiently diverged + /// to a non-terminal state (/ + /// ) and the configured worker-ready wait timeout + /// is positive, this polls (outside _syncRoot) until the worker reaches + /// or the deadline elapses, re-evaluating the + /// fast-path/fail-fast decision under the lock on each poll. Terminal worker states, a + /// missing worker, or a non-Ready session fail fast immediately. With the default + /// timeout of zero this behaves byte-for-byte like the synchronous fail-fast path: no + /// await, no delay. + /// + /// Token to cancel the wait. + /// The worker client once both the session and worker are Ready. + private async Task GetReadyWorkerClientAsync(CancellationToken cancellationToken) + { + const int pollIntervalMs = 25; + + string? failureMessage; + lock (_syncRoot) + { + IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage); + if (ready is not null) + { + return ready; + } + + // Only transient (non-terminal) worker states with a positive wait timeout fall + // through to the bounded wait loop. Everything else (terminal worker, no worker, + // session not Ready, or a zero timeout) fails fast right here under the lock. When + // the worker is merely transient (failureMessage is null) but the wait is disabled, + // build the both-states diagnostic so the zero-timeout path is byte-for-byte the + // original fail-fast message. + if (failureMessage is not null || _workerReadyWaitTimeout <= TimeSpan.Zero) + { + throw new SessionManagerException( + SessionManagerErrorCode.SessionNotReady, + failureMessage ?? BuildNotReadyMessage()); + } + } + + DateTimeOffset deadline = _eventStreaming.TimeProvider.GetUtcNow() + _workerReadyWaitTimeout; + while (true) + { + await Task.Delay(pollIntervalMs, cancellationToken).ConfigureAwait(false); + + lock (_syncRoot) + { + IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage); + if (ready is not null) + { + return ready; + } + + // A terminal worker / missing worker / non-Ready session surfaced while we + // waited: fail fast immediately rather than burning the rest of the deadline. + if (failureMessage is not null) + { + throw new SessionManagerException(SessionManagerErrorCode.SessionNotReady, failureMessage); + } + } + + if (_eventStreaming.TimeProvider.GetUtcNow() >= deadline) + { + lock (_syncRoot) + { + IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage); + if (ready is not null) + { + return ready; + } + + throw new SessionManagerException( + SessionManagerErrorCode.SessionNotReady, + failureMessage ?? BuildNotReadyMessage()); + } + } + } + } + + /// + /// Evaluates readiness while the caller already holds _syncRoot. Returns the + /// worker client when both the session and worker are + /// (with set to ). Returns + /// together with the both-states diagnostic in + /// when the worker is in a terminal state + /// (// + /// ), there is no worker, or the session is not + /// . Returns with a + /// when the session is + /// Ready but the worker is in a transient state + /// (/) — + /// the signal for the async path to keep waiting. + /// + /// + /// The fail-fast both-states diagnostic when readiness cannot succeed, or + /// for the keep-waiting (transient) signal. + /// + /// The ready worker client, or . + private IWorkerClient? EvaluateReadyUnderLock(out string? failureMessage) + { + if (_state == SessionState.Ready && _workerClient?.State == WorkerClientState.Ready) + { + failureMessage = null; return _workerClient; } + + // Keep-waiting signal: session is Ready and the worker is merely transient. + if (_state == SessionState.Ready + && _workerClient is { State: WorkerClientState.Handshaking or WorkerClientState.Created }) + { + failureMessage = null; + return null; + } + + failureMessage = BuildNotReadyMessage(); + return null; + } + + /// Builds the both-states not-ready diagnostic (must be called under _syncRoot). + /// The diagnostic message surfacing both the session and worker states. + private string BuildNotReadyMessage() + { + string workerState = _workerClient is null + ? "" + : _workerClient.State.ToString(); + return $"Session {SessionId} is not ready. Session state is {_state}; worker state is {workerState}."; } private void TrackItem( diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs index 5517f4d..9982794 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs @@ -505,7 +505,8 @@ public sealed class SessionManager : ISessionManager leaseDuration, openedAt, eventStreaming, - TimeSpan.FromSeconds(Math.Max(0, _options.Sessions.DetachGraceSeconds))); + TimeSpan.FromSeconds(Math.Max(0, _options.Sessions.DetachGraceSeconds)), + TimeSpan.FromMilliseconds(Math.Max(0, _options.Sessions.WorkerReadyWaitTimeoutMs))); } private static string CreateClientCorrelationId( diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs index ce7265d..68bbecb 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Options; using Microsoft.Extensions.Time.Testing; @@ -366,6 +367,124 @@ public sealed class SessionManagerTests Assert.Equal(0, workerClient.InvokeCount); } + /// + /// With the opt-in worker-ready wait enabled, a worker that is transiently + /// Handshaking but flips to Ready within the timeout window must let the + /// command through rather than fail fast. + /// + [Fact] + public async Task InvokeAsync_WhenWorkerHandshakingThenReadyWithinTimeout_Succeeds() + { + FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking }; + SessionManager manager = CreateManager( + new FakeSessionWorkerClientFactory(workerClient), + options: CreateOptions(workerReadyWaitTimeoutMs: 500)); + GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None); + Assert.Equal(SessionState.Ready, session.State); + + // Flip the worker to Ready shortly after the invoke starts waiting. + _ = Task.Run(async () => + { + await Task.Delay(50, CancellationToken.None); + workerClient.State = WorkerClientState.Ready; + }); + + WorkerCommandReply reply = await manager.InvokeAsync( + session.SessionId, + CreateCommand(MxCommandKind.Ping), + CancellationToken.None); + + Assert.NotNull(reply); + Assert.Equal(1, workerClient.InvokeCount); + } + + /// + /// A terminal worker state (Faulted) must fail fast even with a positive + /// worker-ready wait timeout, surfacing both states without burning the timeout. + /// + [Fact] + public async Task InvokeAsync_WhenWorkerFaulted_FailsFastWithBothStates() + { + FakeWorkerClient workerClient = new() { State = WorkerClientState.Faulted }; + SessionManager manager = CreateManager( + new FakeSessionWorkerClientFactory(workerClient), + options: CreateOptions(workerReadyWaitTimeoutMs: 500)); + GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None); + Assert.Equal(SessionState.Ready, session.State); + + Stopwatch stopwatch = Stopwatch.StartNew(); + SessionManagerException exception = await Assert.ThrowsAsync( + async () => await manager.InvokeAsync( + session.SessionId, + CreateCommand(MxCommandKind.Ping), + CancellationToken.None)); + stopwatch.Stop(); + + Assert.True(stopwatch.ElapsedMilliseconds < 100, $"Expected immediate fail-fast but took {stopwatch.ElapsedMilliseconds}ms."); + Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode); + Assert.Contains("Session state is Ready", exception.Message); + Assert.Contains("worker state is Faulted", exception.Message); + Assert.Equal(0, workerClient.InvokeCount); + } + + /// + /// When the worker stays transiently not-ready for the whole (small) timeout window, + /// the invoke fails after roughly the timeout with both states surfaced. + /// + [Fact] + public async Task InvokeAsync_WhenTimeoutElapsesStillNotReady_FailsWithBothStates() + { + FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking }; + SessionManager manager = CreateManager( + new FakeSessionWorkerClientFactory(workerClient), + options: CreateOptions(workerReadyWaitTimeoutMs: 100)); + GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None); + Assert.Equal(SessionState.Ready, session.State); + + Stopwatch stopwatch = Stopwatch.StartNew(); + SessionManagerException exception = await Assert.ThrowsAsync( + async () => await manager.InvokeAsync( + session.SessionId, + CreateCommand(MxCommandKind.Ping), + CancellationToken.None)); + stopwatch.Stop(); + + Assert.True(stopwatch.ElapsedMilliseconds >= 90, $"Expected the wait to span the timeout but took only {stopwatch.ElapsedMilliseconds}ms."); + Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode); + Assert.Contains("Session state is Ready", exception.Message); + Assert.Contains("worker state is Handshaking", exception.Message); + Assert.Equal(0, workerClient.InvokeCount); + } + + /// + /// Pins the default (timeout == 0) behavior: a transiently Handshaking worker + /// fails fast immediately, byte-for-byte like the original fail-fast path. + /// + [Fact] + public async Task InvokeAsync_WhenTimeoutZero_FailsFastUnchanged() + { + FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking }; + SessionManager manager = CreateManager( + new FakeSessionWorkerClientFactory(workerClient), + options: CreateOptions(workerReadyWaitTimeoutMs: 0)); + GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None); + Assert.Equal(SessionState.Ready, session.State); + + Stopwatch stopwatch = Stopwatch.StartNew(); + SessionManagerException exception = await Assert.ThrowsAsync( + async () => await manager.InvokeAsync( + session.SessionId, + CreateCommand(MxCommandKind.Ping), + CancellationToken.None)); + stopwatch.Stop(); + + Assert.True(stopwatch.ElapsedMilliseconds < 100, $"Expected immediate fail-fast but took {stopwatch.ElapsedMilliseconds}ms."); + Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode); + Assert.Contains("Session state is Ready", exception.Message); + Assert.Contains("worker state is Handshaking", exception.Message); + Assert.Equal(0, workerClient.InvokeCount); + } + /// Verifies that closing a session removes it from the registry. [Fact] public async Task CloseSessionAsync_RemovesClosedSession() @@ -876,7 +995,8 @@ public sealed class SessionManagerTests private static GatewayOptions CreateOptions( int maxSessions = 64, int defaultLeaseSeconds = 1800, - int detachGraceSeconds = 0) + int detachGraceSeconds = 0, + int workerReadyWaitTimeoutMs = 0) { return new GatewayOptions { @@ -886,6 +1006,7 @@ public sealed class SessionManagerTests MaxSessions = maxSessions, DefaultLeaseSeconds = defaultLeaseSeconds, DetachGraceSeconds = detachGraceSeconds, + WorkerReadyWaitTimeoutMs = workerReadyWaitTimeoutMs, }, Worker = new WorkerOptions {