feat(server): bounded worker-ready wait in GatewaySession (default off)
This commit is contained in:
@@ -23,6 +23,7 @@ public sealed class GatewaySession
|
||||
private bool _closeStarted;
|
||||
private int _activeEventSubscriberCount;
|
||||
private readonly TimeSpan _detachGrace;
|
||||
private readonly TimeSpan _workerReadyWaitTimeout;
|
||||
private DateTimeOffset? _detachedAtUtc;
|
||||
private SessionEventDistributor? _eventDistributor;
|
||||
private bool _eventDistributorStarted;
|
||||
@@ -115,6 +116,17 @@ public sealed class GatewaySession
|
||||
/// The clock comes from <paramref name="eventStreaming"/>'s
|
||||
/// <see cref="SessionEventStreaming.TimeProvider"/> so the timer is unit-testable.
|
||||
/// </param>
|
||||
/// <param name="workerReadyWaitTimeout">
|
||||
/// Bounded time the session will wait, on the command/event hot path, for the worker
|
||||
/// client to reach <see cref="WorkerClientState.Ready"/> when the session is already
|
||||
/// <see cref="SessionState.Ready"/> but the worker state has transiently diverged
|
||||
/// (e.g. <see cref="WorkerClientState.Handshaking"/> after a heartbeat blip). The wait
|
||||
/// applies only to transient worker states; terminal states
|
||||
/// (<see cref="WorkerClientState.Faulted"/>/<see cref="WorkerClientState.Closing"/>/
|
||||
/// <see cref="WorkerClientState.Closed"/>/no worker) and a non-<c>Ready</c> session fail
|
||||
/// fast immediately. <see cref="TimeSpan.Zero"/> (the default) disables the wait and
|
||||
/// preserves the original fail-fast behavior byte-for-byte.
|
||||
/// </param>
|
||||
public GatewaySession(
|
||||
string sessionId,
|
||||
string backendName,
|
||||
@@ -130,7 +142,8 @@ public sealed class GatewaySession
|
||||
TimeSpan leaseDuration,
|
||||
DateTimeOffset openedAt,
|
||||
SessionEventStreaming? eventStreaming = null,
|
||||
TimeSpan detachGrace = default)
|
||||
TimeSpan detachGrace = default,
|
||||
TimeSpan workerReadyWaitTimeout = default)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(sessionId))
|
||||
{
|
||||
@@ -169,6 +182,7 @@ public sealed class GatewaySession
|
||||
_leaseExpiresAt = openedAt + leaseDuration;
|
||||
_eventStreaming = eventStreaming ?? SessionEventStreaming.Default;
|
||||
_detachGrace = detachGrace > TimeSpan.Zero ? detachGrace : TimeSpan.Zero;
|
||||
_workerReadyWaitTimeout = workerReadyWaitTimeout > TimeSpan.Zero ? workerReadyWaitTimeout : TimeSpan.Zero;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -915,7 +929,7 @@ public sealed class GatewaySession
|
||||
WorkerCommand command,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
IWorkerClient workerClient = GetReadyWorkerClient();
|
||||
IWorkerClient workerClient = await GetReadyWorkerClientAsync(cancellationToken).ConfigureAwait(false);
|
||||
TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow());
|
||||
|
||||
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
|
||||
@@ -1258,12 +1272,20 @@ public sealed class GatewaySession
|
||||
/// Reads events from the worker as an asynchronous enumerable stream.
|
||||
/// </summary>
|
||||
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
|
||||
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
|
||||
/// <returns>An asynchronous stream of worker events.</returns>
|
||||
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
IWorkerClient workerClient = GetReadyWorkerClient();
|
||||
IWorkerClient workerClient = await GetReadyWorkerClientAsync(cancellationToken).ConfigureAwait(false);
|
||||
TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow());
|
||||
|
||||
return workerClient.ReadEventsAsync(cancellationToken);
|
||||
await foreach (WorkerEvent workerEvent in workerClient
|
||||
.ReadEventsAsync(cancellationToken)
|
||||
.WithCancellation(cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
yield return workerEvent;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -1666,18 +1688,143 @@ public sealed class GatewaySession
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
|
||||
IWorkerClient? ready = EvaluateReadyUnderLock(out string? failureMessage);
|
||||
if (ready is not null)
|
||||
{
|
||||
string workerState = _workerClient is null
|
||||
? "<no worker>"
|
||||
: _workerClient.State.ToString();
|
||||
throw new SessionManagerException(
|
||||
SessionManagerErrorCode.SessionNotReady,
|
||||
$"Session {SessionId} is not ready. Session state is {_state}; worker state is {workerState}.");
|
||||
return ready;
|
||||
}
|
||||
|
||||
throw new SessionManagerException(SessionManagerErrorCode.SessionNotReady, failureMessage!);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bounded, opt-in async variant of <see cref="GetReadyWorkerClient"/>. When the
|
||||
/// session is <see cref="SessionState.Ready"/> but the worker has transiently diverged
|
||||
/// to a non-terminal state (<see cref="WorkerClientState.Handshaking"/>/
|
||||
/// <see cref="WorkerClientState.Created"/>) and the configured worker-ready wait timeout
|
||||
/// is positive, this polls (outside <c>_syncRoot</c>) until the worker reaches
|
||||
/// <see cref="WorkerClientState.Ready"/> or the deadline elapses, re-evaluating the
|
||||
/// fast-path/fail-fast decision under the lock on each poll. Terminal worker states, a
|
||||
/// missing worker, or a non-<c>Ready</c> session fail fast immediately. With the default
|
||||
/// timeout of zero this behaves byte-for-byte like the synchronous fail-fast path: no
|
||||
/// await, no delay.
|
||||
/// </summary>
|
||||
/// <param name="cancellationToken">Token to cancel the wait.</param>
|
||||
/// <returns>The worker client once both the session and worker are <c>Ready</c>.</returns>
|
||||
private async Task<IWorkerClient> GetReadyWorkerClientAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
const int pollIntervalMs = 25;
|
||||
|
||||
string? failureMessage;
|
||||
lock (_syncRoot)
|
||||
{
|
||||
IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage);
|
||||
if (ready is not null)
|
||||
{
|
||||
return ready;
|
||||
}
|
||||
|
||||
// Only transient (non-terminal) worker states with a positive wait timeout fall
|
||||
// through to the bounded wait loop. Everything else (terminal worker, no worker,
|
||||
// session not Ready, or a zero timeout) fails fast right here under the lock. When
|
||||
// the worker is merely transient (failureMessage is null) but the wait is disabled,
|
||||
// build the both-states diagnostic so the zero-timeout path is byte-for-byte the
|
||||
// original fail-fast message.
|
||||
if (failureMessage is not null || _workerReadyWaitTimeout <= TimeSpan.Zero)
|
||||
{
|
||||
throw new SessionManagerException(
|
||||
SessionManagerErrorCode.SessionNotReady,
|
||||
failureMessage ?? BuildNotReadyMessage());
|
||||
}
|
||||
}
|
||||
|
||||
DateTimeOffset deadline = _eventStreaming.TimeProvider.GetUtcNow() + _workerReadyWaitTimeout;
|
||||
while (true)
|
||||
{
|
||||
await Task.Delay(pollIntervalMs, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
lock (_syncRoot)
|
||||
{
|
||||
IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage);
|
||||
if (ready is not null)
|
||||
{
|
||||
return ready;
|
||||
}
|
||||
|
||||
// A terminal worker / missing worker / non-Ready session surfaced while we
|
||||
// waited: fail fast immediately rather than burning the rest of the deadline.
|
||||
if (failureMessage is not null)
|
||||
{
|
||||
throw new SessionManagerException(SessionManagerErrorCode.SessionNotReady, failureMessage);
|
||||
}
|
||||
}
|
||||
|
||||
if (_eventStreaming.TimeProvider.GetUtcNow() >= deadline)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
IWorkerClient? ready = EvaluateReadyUnderLock(out failureMessage);
|
||||
if (ready is not null)
|
||||
{
|
||||
return ready;
|
||||
}
|
||||
|
||||
throw new SessionManagerException(
|
||||
SessionManagerErrorCode.SessionNotReady,
|
||||
failureMessage ?? BuildNotReadyMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Evaluates readiness while the caller already holds <c>_syncRoot</c>. Returns the
|
||||
/// worker client when both the session and worker are <see cref="WorkerClientState.Ready"/>
|
||||
/// (with <paramref name="failureMessage"/> set to <see langword="null"/>). Returns
|
||||
/// <see langword="null"/> together with the both-states diagnostic in
|
||||
/// <paramref name="failureMessage"/> when the worker is in a terminal state
|
||||
/// (<see cref="WorkerClientState.Faulted"/>/<see cref="WorkerClientState.Closing"/>/
|
||||
/// <see cref="WorkerClientState.Closed"/>), there is no worker, or the session is not
|
||||
/// <see cref="SessionState.Ready"/>. Returns <see langword="null"/> with a
|
||||
/// <see langword="null"/> <paramref name="failureMessage"/> when the session is
|
||||
/// <c>Ready</c> but the worker is in a transient state
|
||||
/// (<see cref="WorkerClientState.Handshaking"/>/<see cref="WorkerClientState.Created"/>) —
|
||||
/// the signal for the async path to keep waiting.
|
||||
/// </summary>
|
||||
/// <param name="failureMessage">
|
||||
/// The fail-fast both-states diagnostic when readiness cannot succeed, or
|
||||
/// <see langword="null"/> for the keep-waiting (transient) signal.
|
||||
/// </param>
|
||||
/// <returns>The ready worker client, or <see langword="null"/>.</returns>
|
||||
private IWorkerClient? EvaluateReadyUnderLock(out string? failureMessage)
|
||||
{
|
||||
if (_state == SessionState.Ready && _workerClient?.State == WorkerClientState.Ready)
|
||||
{
|
||||
failureMessage = null;
|
||||
return _workerClient;
|
||||
}
|
||||
|
||||
// Keep-waiting signal: session is Ready and the worker is merely transient.
|
||||
if (_state == SessionState.Ready
|
||||
&& _workerClient is { State: WorkerClientState.Handshaking or WorkerClientState.Created })
|
||||
{
|
||||
failureMessage = null;
|
||||
return null;
|
||||
}
|
||||
|
||||
failureMessage = BuildNotReadyMessage();
|
||||
return null;
|
||||
}
|
||||
|
||||
/// <summary>Builds the both-states not-ready diagnostic (must be called under <c>_syncRoot</c>).</summary>
|
||||
/// <returns>The diagnostic message surfacing both the session and worker states.</returns>
|
||||
private string BuildNotReadyMessage()
|
||||
{
|
||||
string workerState = _workerClient is null
|
||||
? "<no worker>"
|
||||
: _workerClient.State.ToString();
|
||||
return $"Session {SessionId} is not ready. Session state is {_state}; worker state is {workerState}.";
|
||||
}
|
||||
|
||||
private void TrackItem(
|
||||
|
||||
@@ -505,7 +505,8 @@ public sealed class SessionManager : ISessionManager
|
||||
leaseDuration,
|
||||
openedAt,
|
||||
eventStreaming,
|
||||
TimeSpan.FromSeconds(Math.Max(0, _options.Sessions.DetachGraceSeconds)));
|
||||
TimeSpan.FromSeconds(Math.Max(0, _options.Sessions.DetachGraceSeconds)),
|
||||
TimeSpan.FromMilliseconds(Math.Max(0, _options.Sessions.WorkerReadyWaitTimeoutMs)));
|
||||
}
|
||||
|
||||
private static string CreateClientCorrelationId(
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
using System.Diagnostics;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Microsoft.Extensions.Time.Testing;
|
||||
@@ -366,6 +367,124 @@ public sealed class SessionManagerTests
|
||||
Assert.Equal(0, workerClient.InvokeCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// With the opt-in worker-ready wait enabled, a worker that is transiently
|
||||
/// <c>Handshaking</c> but flips to <c>Ready</c> within the timeout window must let the
|
||||
/// command through rather than fail fast.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task InvokeAsync_WhenWorkerHandshakingThenReadyWithinTimeout_Succeeds()
|
||||
{
|
||||
FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking };
|
||||
SessionManager manager = CreateManager(
|
||||
new FakeSessionWorkerClientFactory(workerClient),
|
||||
options: CreateOptions(workerReadyWaitTimeoutMs: 500));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
Assert.Equal(SessionState.Ready, session.State);
|
||||
|
||||
// Flip the worker to Ready shortly after the invoke starts waiting.
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
await Task.Delay(50, CancellationToken.None);
|
||||
workerClient.State = WorkerClientState.Ready;
|
||||
});
|
||||
|
||||
WorkerCommandReply reply = await manager.InvokeAsync(
|
||||
session.SessionId,
|
||||
CreateCommand(MxCommandKind.Ping),
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.NotNull(reply);
|
||||
Assert.Equal(1, workerClient.InvokeCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A terminal worker state (<c>Faulted</c>) must fail fast even with a positive
|
||||
/// worker-ready wait timeout, surfacing both states without burning the timeout.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task InvokeAsync_WhenWorkerFaulted_FailsFastWithBothStates()
|
||||
{
|
||||
FakeWorkerClient workerClient = new() { State = WorkerClientState.Faulted };
|
||||
SessionManager manager = CreateManager(
|
||||
new FakeSessionWorkerClientFactory(workerClient),
|
||||
options: CreateOptions(workerReadyWaitTimeoutMs: 500));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
Assert.Equal(SessionState.Ready, session.State);
|
||||
|
||||
Stopwatch stopwatch = Stopwatch.StartNew();
|
||||
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await manager.InvokeAsync(
|
||||
session.SessionId,
|
||||
CreateCommand(MxCommandKind.Ping),
|
||||
CancellationToken.None));
|
||||
stopwatch.Stop();
|
||||
|
||||
Assert.True(stopwatch.ElapsedMilliseconds < 100, $"Expected immediate fail-fast but took {stopwatch.ElapsedMilliseconds}ms.");
|
||||
Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
|
||||
Assert.Contains("Session state is Ready", exception.Message);
|
||||
Assert.Contains("worker state is Faulted", exception.Message);
|
||||
Assert.Equal(0, workerClient.InvokeCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// When the worker stays transiently not-ready for the whole (small) timeout window,
|
||||
/// the invoke fails after roughly the timeout with both states surfaced.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task InvokeAsync_WhenTimeoutElapsesStillNotReady_FailsWithBothStates()
|
||||
{
|
||||
FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking };
|
||||
SessionManager manager = CreateManager(
|
||||
new FakeSessionWorkerClientFactory(workerClient),
|
||||
options: CreateOptions(workerReadyWaitTimeoutMs: 100));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
Assert.Equal(SessionState.Ready, session.State);
|
||||
|
||||
Stopwatch stopwatch = Stopwatch.StartNew();
|
||||
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await manager.InvokeAsync(
|
||||
session.SessionId,
|
||||
CreateCommand(MxCommandKind.Ping),
|
||||
CancellationToken.None));
|
||||
stopwatch.Stop();
|
||||
|
||||
Assert.True(stopwatch.ElapsedMilliseconds >= 90, $"Expected the wait to span the timeout but took only {stopwatch.ElapsedMilliseconds}ms.");
|
||||
Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
|
||||
Assert.Contains("Session state is Ready", exception.Message);
|
||||
Assert.Contains("worker state is Handshaking", exception.Message);
|
||||
Assert.Equal(0, workerClient.InvokeCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Pins the default (timeout == 0) behavior: a transiently <c>Handshaking</c> worker
|
||||
/// fails fast immediately, byte-for-byte like the original fail-fast path.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task InvokeAsync_WhenTimeoutZero_FailsFastUnchanged()
|
||||
{
|
||||
FakeWorkerClient workerClient = new() { State = WorkerClientState.Handshaking };
|
||||
SessionManager manager = CreateManager(
|
||||
new FakeSessionWorkerClientFactory(workerClient),
|
||||
options: CreateOptions(workerReadyWaitTimeoutMs: 0));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
Assert.Equal(SessionState.Ready, session.State);
|
||||
|
||||
Stopwatch stopwatch = Stopwatch.StartNew();
|
||||
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await manager.InvokeAsync(
|
||||
session.SessionId,
|
||||
CreateCommand(MxCommandKind.Ping),
|
||||
CancellationToken.None));
|
||||
stopwatch.Stop();
|
||||
|
||||
Assert.True(stopwatch.ElapsedMilliseconds < 100, $"Expected immediate fail-fast but took {stopwatch.ElapsedMilliseconds}ms.");
|
||||
Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
|
||||
Assert.Contains("Session state is Ready", exception.Message);
|
||||
Assert.Contains("worker state is Handshaking", exception.Message);
|
||||
Assert.Equal(0, workerClient.InvokeCount);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that closing a session removes it from the registry.</summary>
|
||||
[Fact]
|
||||
public async Task CloseSessionAsync_RemovesClosedSession()
|
||||
@@ -876,7 +995,8 @@ public sealed class SessionManagerTests
|
||||
private static GatewayOptions CreateOptions(
|
||||
int maxSessions = 64,
|
||||
int defaultLeaseSeconds = 1800,
|
||||
int detachGraceSeconds = 0)
|
||||
int detachGraceSeconds = 0,
|
||||
int workerReadyWaitTimeoutMs = 0)
|
||||
{
|
||||
return new GatewayOptions
|
||||
{
|
||||
@@ -886,6 +1006,7 @@ public sealed class SessionManagerTests
|
||||
MaxSessions = maxSessions,
|
||||
DefaultLeaseSeconds = defaultLeaseSeconds,
|
||||
DetachGraceSeconds = detachGraceSeconds,
|
||||
WorkerReadyWaitTimeoutMs = workerReadyWaitTimeoutMs,
|
||||
},
|
||||
Worker = new WorkerOptions
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user