diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
index be4d36f..70539f9 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
@@ -23,6 +23,7 @@ public sealed class GatewaySession
private bool _closeStarted;
private int _activeEventSubscriberCount;
private readonly TimeSpan _detachGrace;
+ private readonly TimeSpan _workerReadyWaitTimeout;
private DateTimeOffset? _detachedAtUtc;
private SessionEventDistributor? _eventDistributor;
private bool _eventDistributorStarted;
@@ -115,6 +116,17 @@ public sealed class GatewaySession
/// The clock comes from 's
/// so the timer is unit-testable.
///
+ ///
+ /// Bounded time the session will wait, on the command/event hot path, for the worker
+ /// client to reach when the session is already
+ /// but the worker state has transiently diverged
+ /// (e.g. after a heartbeat blip). The wait
+ /// applies only to transient worker states; terminal states
+ /// (//
+ /// /no worker) and a non-Ready session fail
+ /// fast immediately. (the default) disables the wait and
+ /// preserves the original fail-fast behavior byte-for-byte.
+ ///
public GatewaySession(
string sessionId,
string backendName,
@@ -130,7 +142,8 @@ public sealed class GatewaySession
TimeSpan leaseDuration,
DateTimeOffset openedAt,
SessionEventStreaming? eventStreaming = null,
- TimeSpan detachGrace = default)
+ TimeSpan detachGrace = default,
+ TimeSpan workerReadyWaitTimeout = default)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
@@ -169,6 +182,7 @@ public sealed class GatewaySession
_leaseExpiresAt = openedAt + leaseDuration;
_eventStreaming = eventStreaming ?? SessionEventStreaming.Default;
_detachGrace = detachGrace > TimeSpan.Zero ? detachGrace : TimeSpan.Zero;
+ _workerReadyWaitTimeout = workerReadyWaitTimeout > TimeSpan.Zero ? workerReadyWaitTimeout : TimeSpan.Zero;
}
///
@@ -915,7 +929,7 @@ public sealed class GatewaySession
WorkerCommand command,
CancellationToken cancellationToken)
{
- IWorkerClient workerClient = GetReadyWorkerClient();
+ IWorkerClient workerClient = await GetReadyWorkerClientAsync(cancellationToken).ConfigureAwait(false);
TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow());
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
@@ -1258,12 +1272,20 @@ public sealed class GatewaySession
/// Reads events from the worker as an asynchronous enumerable stream.
///
/// Token to cancel the asynchronous operation.
- public IAsyncEnumerable ReadEventsAsync(CancellationToken cancellationToken)
+ /// An asynchronous stream of worker events.
+ public async IAsyncEnumerable ReadEventsAsync(
+ [EnumeratorCancellation] CancellationToken cancellationToken)
{
- IWorkerClient workerClient = GetReadyWorkerClient();
+ IWorkerClient workerClient = await GetReadyWorkerClientAsync(cancellationToken).ConfigureAwait(false);
TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow());
- return workerClient.ReadEventsAsync(cancellationToken);
+ await foreach (WorkerEvent workerEvent in workerClient
+ .ReadEventsAsync(cancellationToken)
+ .WithCancellation(cancellationToken)
+ .ConfigureAwait(false))
+ {
+ yield return workerEvent;
+ }
}
///
@@ -1666,18 +1688,143 @@ public sealed class GatewaySession
{
lock (_syncRoot)
{
- if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
+ IWorkerClient? ready = EvaluateReadyUnderLock(out string? failureMessage);
+ if (ready is not null)
{
- string workerState = _workerClient is null
- ? ""
- : _workerClient.State.ToString();
- throw new SessionManagerException(
- SessionManagerErrorCode.SessionNotReady,
- $"Session {SessionId} is not ready. Session state is {_state}; worker state is {workerState}.");
+ return ready;
}
+ throw new SessionManagerException(SessionManagerErrorCode.SessionNotReady, failureMessage!);
+ }
+ }
+
+ ///
+ /// Bounded, opt-in async variant of . When the
+ /// session is but the worker has transiently diverged
+ /// to a non-terminal state (/
+ /// ) and the configured worker-ready wait timeout
+ /// is positive, this polls (outside _syncRoot) until the worker reaches
+ /// or the deadline elapses, re-evaluating the
+ /// fast-path/fail-fast decision under the lock on each poll. Terminal worker states, a
+ /// missing worker, or a non-Ready session fail fast immediately. With the default
+ /// timeout of zero this behaves byte-for-byte like the synchronous fail-fast path: no
+ /// await, no delay.
+ ///
+ /// Token to cancel the wait.
+ /// The worker client once both the session and worker are Ready.
+ private async Task GetReadyWorkerClientAsync(CancellationToken cancellationToken)
+ {
+ const int pollIntervalMs = 25;
+
+ string? failureMessage;
+ lock (_syncRoot)
+ {
+ IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage);
+ if (ready is not null)
+ {
+ return ready;
+ }
+
+ // Only transient (non-terminal) worker states with a positive wait timeout fall
+ // through to the bounded wait loop. Everything else (terminal worker, no worker,
+ // session not Ready, or a zero timeout) fails fast right here under the lock. When
+ // the worker is merely transient (failureMessage is null) but the wait is disabled,
+ // build the both-states diagnostic so the zero-timeout path is byte-for-byte the
+ // original fail-fast message.
+ if (failureMessage is not null || _workerReadyWaitTimeout <= TimeSpan.Zero)
+ {
+ throw new SessionManagerException(
+ SessionManagerErrorCode.SessionNotReady,
+ failureMessage ?? BuildNotReadyMessage());
+ }
+ }
+
+ DateTimeOffset deadline = _eventStreaming.TimeProvider.GetUtcNow() + _workerReadyWaitTimeout;
+ while (true)
+ {
+ await Task.Delay(pollIntervalMs, cancellationToken).ConfigureAwait(false);
+
+ lock (_syncRoot)
+ {
+ IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage);
+ if (ready is not null)
+ {
+ return ready;
+ }
+
+ // A terminal worker / missing worker / non-Ready session surfaced while we
+ // waited: fail fast immediately rather than burning the rest of the deadline.
+ if (failureMessage is not null)
+ {
+ throw new SessionManagerException(SessionManagerErrorCode.SessionNotReady, failureMessage);
+ }
+ }
+
+ if (_eventStreaming.TimeProvider.GetUtcNow() >= deadline)
+ {
+ lock (_syncRoot)
+ {
+ IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage);
+ if (ready is not null)
+ {
+ return ready;
+ }
+
+ throw new SessionManagerException(
+ SessionManagerErrorCode.SessionNotReady,
+ failureMessage ?? BuildNotReadyMessage());
+ }
+ }
+ }
+ }
+
+ ///
+ /// Evaluates readiness while the caller already holds _syncRoot. Returns the
+ /// worker client when both the session and worker are
+ /// (with set to ). Returns
+ /// together with the both-states diagnostic in
+ /// when the worker is in a terminal state
+ /// (//
+ /// ), there is no worker, or the session is not
+ /// . Returns with a
+ /// when the session is
+ /// Ready but the worker is in a transient state
+ /// (/) —
+ /// the signal for the async path to keep waiting.
+ ///
+ ///
+ /// The fail-fast both-states diagnostic when readiness cannot succeed, or
+ /// for the keep-waiting (transient) signal.
+ ///
+ /// The ready worker client, or .
+ private IWorkerClient? EvaluateReadyUnderLock(out string? failureMessage)
+ {
+ if (_state == SessionState.Ready && _workerClient?.State == WorkerClientState.Ready)
+ {
+ failureMessage = null;
return _workerClient;
}
+
+ // Keep-waiting signal: session is Ready and the worker is merely transient.
+ if (_state == SessionState.Ready
+ && _workerClient is { State: WorkerClientState.Handshaking or WorkerClientState.Created })
+ {
+ failureMessage = null;
+ return null;
+ }
+
+ failureMessage = BuildNotReadyMessage();
+ return null;
+ }
+
+ /// Builds the both-states not-ready diagnostic (must be called under _syncRoot).
+ /// The diagnostic message surfacing both the session and worker states.
+ private string BuildNotReadyMessage()
+ {
+ string workerState = _workerClient is null
+ ? ""
+ : _workerClient.State.ToString();
+ return $"Session {SessionId} is not ready. Session state is {_state}; worker state is {workerState}.";
}
private void TrackItem(
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs
index 5517f4d..9982794 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs
@@ -505,7 +505,8 @@ public sealed class SessionManager : ISessionManager
leaseDuration,
openedAt,
eventStreaming,
- TimeSpan.FromSeconds(Math.Max(0, _options.Sessions.DetachGraceSeconds)));
+ TimeSpan.FromSeconds(Math.Max(0, _options.Sessions.DetachGraceSeconds)),
+ TimeSpan.FromMilliseconds(Math.Max(0, _options.Sessions.WorkerReadyWaitTimeoutMs)));
}
private static string CreateClientCorrelationId(
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs
index ce7265d..68bbecb 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs
@@ -1,3 +1,4 @@
+using System.Diagnostics;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Time.Testing;
@@ -366,6 +367,124 @@ public sealed class SessionManagerTests
Assert.Equal(0, workerClient.InvokeCount);
}
+ ///
+ /// With the opt-in worker-ready wait enabled, a worker that is transiently
+ /// Handshaking but flips to Ready within the timeout window must let the
+ /// command through rather than fail fast.
+ ///
+ [Fact]
+ public async Task InvokeAsync_WhenWorkerHandshakingThenReadyWithinTimeout_Succeeds()
+ {
+ FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking };
+ SessionManager manager = CreateManager(
+ new FakeSessionWorkerClientFactory(workerClient),
+ options: CreateOptions(workerReadyWaitTimeoutMs: 500));
+ GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
+ Assert.Equal(SessionState.Ready, session.State);
+
+ // Flip the worker to Ready shortly after the invoke starts waiting.
+ _ = Task.Run(async () =>
+ {
+ await Task.Delay(50, CancellationToken.None);
+ workerClient.State = WorkerClientState.Ready;
+ });
+
+ WorkerCommandReply reply = await manager.InvokeAsync(
+ session.SessionId,
+ CreateCommand(MxCommandKind.Ping),
+ CancellationToken.None);
+
+ Assert.NotNull(reply);
+ Assert.Equal(1, workerClient.InvokeCount);
+ }
+
+ ///
+ /// A terminal worker state (Faulted) must fail fast even with a positive
+ /// worker-ready wait timeout, surfacing both states without burning the timeout.
+ ///
+ [Fact]
+ public async Task InvokeAsync_WhenWorkerFaulted_FailsFastWithBothStates()
+ {
+ FakeWorkerClient workerClient = new() { State = WorkerClientState.Faulted };
+ SessionManager manager = CreateManager(
+ new FakeSessionWorkerClientFactory(workerClient),
+ options: CreateOptions(workerReadyWaitTimeoutMs: 500));
+ GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
+ Assert.Equal(SessionState.Ready, session.State);
+
+ Stopwatch stopwatch = Stopwatch.StartNew();
+ SessionManagerException exception = await Assert.ThrowsAsync(
+ async () => await manager.InvokeAsync(
+ session.SessionId,
+ CreateCommand(MxCommandKind.Ping),
+ CancellationToken.None));
+ stopwatch.Stop();
+
+ Assert.True(stopwatch.ElapsedMilliseconds < 100, $"Expected immediate fail-fast but took {stopwatch.ElapsedMilliseconds}ms.");
+ Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
+ Assert.Contains("Session state is Ready", exception.Message);
+ Assert.Contains("worker state is Faulted", exception.Message);
+ Assert.Equal(0, workerClient.InvokeCount);
+ }
+
+ ///
+ /// When the worker stays transiently not-ready for the whole (small) timeout window,
+ /// the invoke fails after roughly the timeout with both states surfaced.
+ ///
+ [Fact]
+ public async Task InvokeAsync_WhenTimeoutElapsesStillNotReady_FailsWithBothStates()
+ {
+ FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking };
+ SessionManager manager = CreateManager(
+ new FakeSessionWorkerClientFactory(workerClient),
+ options: CreateOptions(workerReadyWaitTimeoutMs: 100));
+ GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
+ Assert.Equal(SessionState.Ready, session.State);
+
+ Stopwatch stopwatch = Stopwatch.StartNew();
+ SessionManagerException exception = await Assert.ThrowsAsync(
+ async () => await manager.InvokeAsync(
+ session.SessionId,
+ CreateCommand(MxCommandKind.Ping),
+ CancellationToken.None));
+ stopwatch.Stop();
+
+ Assert.True(stopwatch.ElapsedMilliseconds >= 90, $"Expected the wait to span the timeout but took only {stopwatch.ElapsedMilliseconds}ms.");
+ Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
+ Assert.Contains("Session state is Ready", exception.Message);
+ Assert.Contains("worker state is Handshaking", exception.Message);
+ Assert.Equal(0, workerClient.InvokeCount);
+ }
+
+ ///
+ /// Pins the default (timeout == 0) behavior: a transiently Handshaking worker
+ /// fails fast immediately, byte-for-byte like the original fail-fast path.
+ ///
+ [Fact]
+ public async Task InvokeAsync_WhenTimeoutZero_FailsFastUnchanged()
+ {
+ FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking };
+ SessionManager manager = CreateManager(
+ new FakeSessionWorkerClientFactory(workerClient),
+ options: CreateOptions(workerReadyWaitTimeoutMs: 0));
+ GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
+ Assert.Equal(SessionState.Ready, session.State);
+
+ Stopwatch stopwatch = Stopwatch.StartNew();
+ SessionManagerException exception = await Assert.ThrowsAsync(
+ async () => await manager.InvokeAsync(
+ session.SessionId,
+ CreateCommand(MxCommandKind.Ping),
+ CancellationToken.None));
+ stopwatch.Stop();
+
+ Assert.True(stopwatch.ElapsedMilliseconds < 100, $"Expected immediate fail-fast but took {stopwatch.ElapsedMilliseconds}ms.");
+ Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
+ Assert.Contains("Session state is Ready", exception.Message);
+ Assert.Contains("worker state is Handshaking", exception.Message);
+ Assert.Equal(0, workerClient.InvokeCount);
+ }
+
/// Verifies that closing a session removes it from the registry.
[Fact]
public async Task CloseSessionAsync_RemovesClosedSession()
@@ -876,7 +995,8 @@ public sealed class SessionManagerTests
private static GatewayOptions CreateOptions(
int maxSessions = 64,
int defaultLeaseSeconds = 1800,
- int detachGraceSeconds = 0)
+ int detachGraceSeconds = 0,
+ int workerReadyWaitTimeoutMs = 0)
{
return new GatewayOptions
{
@@ -886,6 +1006,7 @@ public sealed class SessionManagerTests
MaxSessions = maxSessions,
DefaultLeaseSeconds = defaultLeaseSeconds,
DetachGraceSeconds = detachGraceSeconds,
+ WorkerReadyWaitTimeoutMs = workerReadyWaitTimeoutMs,
},
Worker = new WorkerOptions
{