fix(sessions): expose DetachGraceSeconds in effective-config; single clock; close reconnect-vs-sweep race
- EffectiveSessionConfiguration: add DetachGraceSeconds field; GatewayConfigurationProvider forwards value.Sessions.DetachGraceSeconds (blocker fix). - GatewaySession.InvokeAsync and ReadEventsAsync: switch TouchClientActivity calls from DateTimeOffset.UtcNow to _eventStreaming.TimeProvider.GetUtcNow() so Task 12 fake-clock control works end-to-end (split-clock fix). - TOCTOU fix: add TryBeginCloseIfExpired(now, out alreadyClosing) to GatewaySession that re-checks IsLeaseExpiredCore/IsDetachGraceExpiredCore AND _activeEventSubscriberCount==0 under _syncRoot before transitioning to Closing; CloseExpiredLeasesAsync calls it before CloseSessionCoreAsync so a reattach that wins the race leaves the session Ready/usable. - Minors: lease-expiry-takes-precedence comment in CloseExpiredLeasesAsync; TOCTOU comment block; sweep-cycle latency note added to SessionOptions.DetachGraceSeconds XML doc and to GatewayConfiguration.md DetachGraceSeconds row. - New tests: TryBeginCloseIfExpired_ReattachedSubscriberWinsRace_DeclinesClose (GatewaySession), CloseExpiredLeasesAsync_DoesNotCloseSessionThatReattachedBeforeSweepCloses (SessionManager), plus IsLeaseExpiredCore/IsDetachGraceExpiredCore private helpers used by the guard.
This commit is contained in:
@@ -127,7 +127,7 @@ to avoid accidental large allocations from malformed or oversized frames.
|
|||||||
| `MxGateway:Sessions:MaxPendingCommandsPerSession` | `128` | Maximum number of pending worker commands for one session. Excess commands fail fast instead of queueing indefinitely. |
|
| `MxGateway:Sessions:MaxPendingCommandsPerSession` | `128` | Maximum number of pending worker commands for one session. Excess commands fail fast instead of queueing indefinitely. |
|
||||||
| `MxGateway:Sessions:DefaultLeaseSeconds` | `1800` | Initial session lease and refresh duration. Unary client activity extends the lease by this duration. |
|
| `MxGateway:Sessions:DefaultLeaseSeconds` | `1800` | Initial session lease and refresh duration. Unary client activity extends the lease by this duration. |
|
||||||
| `MxGateway:Sessions:LeaseSweepIntervalSeconds` | `30` | Hosted monitor interval for closing expired leases. Active event-stream subscribers keep a session from expiring while the stream remains attached. |
|
| `MxGateway:Sessions:LeaseSweepIntervalSeconds` | `30` | Hosted monitor interval for closing expired leases. Active event-stream subscribers keep a session from expiring while the stream remains attached. |
|
||||||
| `MxGateway:Sessions:DetachGraceSeconds` | `30` | Detach-grace retention window. When positive, a session whose last external (gRPC) event-stream subscriber drops is retained in `Ready` for this many seconds so a client can reconnect; if no external subscriber re-attaches within the window, the lease monitor closes it with `detach-grace-expired`. The internal dashboard mirror does not count as an external subscriber, so a dashboard-only session still enters detach-grace. `0` disables retention and reverts to closing only on normal lease expiry. Must be zero or greater. Reconnect/replay itself is implemented separately (Task 12); this option controls retention and expiry only. |
|
| `MxGateway:Sessions:DetachGraceSeconds` | `30` | Detach-grace retention window. When positive, a session whose last external (gRPC) event-stream subscriber drops is retained in `Ready` for this many seconds so a client can reconnect; if no external subscriber re-attaches within the window, the lease monitor closes it with `detach-grace-expired`. The internal dashboard mirror does not count as an external subscriber, so a dashboard-only session still enters detach-grace. `0` disables retention and reverts to closing only on normal lease expiry. Must be zero or greater. Reconnect/replay itself is implemented separately (Task 12); this option controls retention and expiry only. The effective close happens within the next sweep cycle after the window elapses — up to `LeaseSweepIntervalSeconds` after expiry. Operators wanting a firm minimum retention bound should set `DetachGraceSeconds` greater than `LeaseSweepIntervalSeconds`. |
|
||||||
| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. When `false` the session refuses a second subscriber with `AlreadyExists`. Set to `true` to enable fan-out via the `SessionEventDistributor`. |
|
| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. When `false` the session refuses a second subscriber with `AlreadyExists`. Set to `true` to enable fan-out via the `SessionEventDistributor`. |
|
||||||
| `MxGateway:Sessions:MaxEventSubscribersPerSession` | `8` | Maximum number of concurrent `StreamEvents` subscribers per session when `AllowMultipleEventSubscribers` is `true`. Effectively 1 when `AllowMultipleEventSubscribers` is `false`. Must be greater than zero. |
|
| `MxGateway:Sessions:MaxEventSubscribersPerSession` | `8` | Maximum number of concurrent `StreamEvents` subscribers per session when `AllowMultipleEventSubscribers` is `true`. Effectively 1 when `AllowMultipleEventSubscribers` is `false`. Must be greater than zero. |
|
||||||
|
|
||||||
|
|||||||
@@ -6,5 +6,6 @@ public sealed record EffectiveSessionConfiguration(
|
|||||||
int MaxPendingCommandsPerSession,
|
int MaxPendingCommandsPerSession,
|
||||||
int DefaultLeaseSeconds,
|
int DefaultLeaseSeconds,
|
||||||
int LeaseSweepIntervalSeconds,
|
int LeaseSweepIntervalSeconds,
|
||||||
|
int DetachGraceSeconds,
|
||||||
bool AllowMultipleEventSubscribers,
|
bool AllowMultipleEventSubscribers,
|
||||||
int MaxEventSubscribersPerSession);
|
int MaxEventSubscribersPerSession);
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> option
|
|||||||
MaxPendingCommandsPerSession: value.Sessions.MaxPendingCommandsPerSession,
|
MaxPendingCommandsPerSession: value.Sessions.MaxPendingCommandsPerSession,
|
||||||
DefaultLeaseSeconds: value.Sessions.DefaultLeaseSeconds,
|
DefaultLeaseSeconds: value.Sessions.DefaultLeaseSeconds,
|
||||||
LeaseSweepIntervalSeconds: value.Sessions.LeaseSweepIntervalSeconds,
|
LeaseSweepIntervalSeconds: value.Sessions.LeaseSweepIntervalSeconds,
|
||||||
|
DetachGraceSeconds: value.Sessions.DetachGraceSeconds,
|
||||||
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers,
|
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers,
|
||||||
MaxEventSubscribersPerSession: value.Sessions.MaxEventSubscribersPerSession),
|
MaxEventSubscribersPerSession: value.Sessions.MaxEventSubscribersPerSession),
|
||||||
Events: new EffectiveEventConfiguration(
|
Events: new EffectiveEventConfiguration(
|
||||||
|
|||||||
@@ -36,6 +36,13 @@ public sealed class SessionOptions
|
|||||||
/// expires. The reconnect/replay itself is implemented separately (Task 12); this
|
/// expires. The reconnect/replay itself is implemented separately (Task 12); this
|
||||||
/// option controls retention and expiry only.
|
/// option controls retention and expiry only.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// The effective close happens within the next sweep cycle after the window elapses —
|
||||||
|
/// up to <see cref="LeaseSweepIntervalSeconds"/> after expiry. Operators who want a
|
||||||
|
/// firm minimum bound should set <c>DetachGraceSeconds</c> greater than
|
||||||
|
/// <see cref="LeaseSweepIntervalSeconds"/>; otherwise a session whose window expires
|
||||||
|
/// just before a sweep run may be closed within seconds of detach.
|
||||||
|
/// </remarks>
|
||||||
public int DetachGraceSeconds { get; init; } = 30;
|
public int DetachGraceSeconds { get; init; } = 30;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -821,7 +821,7 @@ public sealed class GatewaySession
|
|||||||
CancellationToken cancellationToken)
|
CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
IWorkerClient workerClient = GetReadyWorkerClient();
|
IWorkerClient workerClient = GetReadyWorkerClient();
|
||||||
TouchClientActivity(DateTimeOffset.UtcNow);
|
TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow());
|
||||||
|
|
||||||
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
|
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
@@ -1166,7 +1166,7 @@ public sealed class GatewaySession
|
|||||||
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
|
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
IWorkerClient workerClient = GetReadyWorkerClient();
|
IWorkerClient workerClient = GetReadyWorkerClient();
|
||||||
TouchClientActivity(DateTimeOffset.UtcNow);
|
TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow());
|
||||||
|
|
||||||
return workerClient.ReadEventsAsync(cancellationToken);
|
return workerClient.ReadEventsAsync(cancellationToken);
|
||||||
}
|
}
|
||||||
@@ -1260,6 +1260,74 @@ public sealed class GatewaySession
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Atomically re-verifies that the session is still eligible for sweep-initiated close
|
||||||
|
/// (lease expired OR detach-grace expired, with no active external subscriber) and, if so,
|
||||||
|
/// transitions to <c>Closing</c> in a single lock acquisition.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="now">Current timestamp used for expiry re-check.</param>
|
||||||
|
/// <param name="alreadyClosing">
|
||||||
|
/// Set to <see langword="true"/> when a concurrent close is already in flight; the caller
|
||||||
|
/// should treat the session as already being closed (same semantics as
|
||||||
|
/// <see cref="CloseAsync"/>).
|
||||||
|
/// </param>
|
||||||
|
/// <returns>
|
||||||
|
/// <see langword="true"/> when the state was flipped to <c>Closing</c> and the caller
|
||||||
|
/// should proceed with teardown; <see langword="false"/> when the session is already
|
||||||
|
/// closed OR is no longer eligible (a subscriber re-attached between the eligibility
|
||||||
|
/// check in the sweep loop and this call — the reconnect won the race and the session
|
||||||
|
/// should be left open).
|
||||||
|
/// </returns>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// Race: <c>CloseExpiredLeasesAsync</c> evaluates <see cref="IsLeaseExpired"/> /
|
||||||
|
/// <see cref="IsDetachGraceExpired"/> outside the close lock, then calls
|
||||||
|
/// <see cref="CloseAsync"/> which takes <c>_closeLock</c>. A client can call
|
||||||
|
/// <see cref="AttachEventSubscriber"/> in between, clearing <c>_detachedAtUtc</c> and
|
||||||
|
/// incrementing <c>_activeEventSubscriberCount</c> — the session is no longer expired.
|
||||||
|
/// This method re-checks eligibility atomically under <c>_syncRoot</c> before
|
||||||
|
/// committing to <c>Closing</c>, so a reattach that wins the race leaves the session
|
||||||
|
/// in <c>Ready</c> and usable.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
|
internal bool TryBeginCloseIfExpired(DateTimeOffset now, out bool alreadyClosing)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
if (_state is SessionState.Closed)
|
||||||
|
{
|
||||||
|
alreadyClosing = _closeStarted;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-verify eligibility atomically. If a subscriber reattached between the sweep's
|
||||||
|
// eligibility check and this point, neither condition holds and we decline.
|
||||||
|
bool eligible = IsLeaseExpiredCore(now) || IsDetachGraceExpiredCore(now);
|
||||||
|
if (!eligible)
|
||||||
|
{
|
||||||
|
alreadyClosing = false;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
alreadyClosing = _closeStarted;
|
||||||
|
_closeStarted = true;
|
||||||
|
_state = SessionState.Closing;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock-free (must be called under _syncRoot) helpers used by TryBeginCloseIfExpired.
|
||||||
|
private bool IsLeaseExpiredCore(DateTimeOffset now)
|
||||||
|
=> _activeEventSubscriberCount == 0
|
||||||
|
&& _leaseExpiresAt is not null
|
||||||
|
&& _leaseExpiresAt <= now;
|
||||||
|
|
||||||
|
private bool IsDetachGraceExpiredCore(DateTimeOffset now)
|
||||||
|
=> _detachGrace > TimeSpan.Zero
|
||||||
|
&& _activeEventSubscriberCount == 0
|
||||||
|
&& _detachedAtUtc is not null
|
||||||
|
&& now - _detachedAtUtc.Value >= _detachGrace;
|
||||||
|
|
||||||
// Final terminal transition; under _syncRoot to keep _state writes single-lock.
|
// Final terminal transition; under _syncRoot to keep _state writes single-lock.
|
||||||
// Closed is unconditionally terminal — TransitionTo refuses to overwrite it —
|
// Closed is unconditionally terminal — TransitionTo refuses to overwrite it —
|
||||||
// so we don't need to re-check the precondition here.
|
// so we don't need to re-check the precondition here.
|
||||||
|
|||||||
@@ -301,6 +301,12 @@ public sealed class SessionManager : ISessionManager
|
|||||||
// reconnected within DetachGraceSeconds). The detach-grace close is the same
|
// reconnected within DetachGraceSeconds). The detach-grace close is the same
|
||||||
// teardown as a lease-expiry close; only the reason differs so operators can tell
|
// teardown as a lease-expiry close; only the reason differs so operators can tell
|
||||||
// a short reconnect-window expiry from a long idle-lease expiry in logs/metrics.
|
// a short reconnect-window expiry from a long idle-lease expiry in logs/metrics.
|
||||||
|
// Lease-expiry takes PRECEDENCE over detach-grace when both conditions fire
|
||||||
|
// simultaneously (reason will be lease-expired, not detach-grace-expired).
|
||||||
|
//
|
||||||
|
// TOCTOU note: eligibility is re-verified atomically inside TryBeginCloseIfExpired
|
||||||
|
// under _syncRoot, so a client that reattaches a subscriber between the check above
|
||||||
|
// and the close call wins the race and the session is left open and usable.
|
||||||
string? reason = session.IsLeaseExpired(now)
|
string? reason = session.IsLeaseExpired(now)
|
||||||
? LeaseExpiredReason
|
? LeaseExpiredReason
|
||||||
: session.IsDetachGraceExpired(now)
|
: session.IsDetachGraceExpired(now)
|
||||||
@@ -311,6 +317,15 @@ public sealed class SessionManager : ISessionManager
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Re-verify eligibility atomically and begin the Closing transition before
|
||||||
|
// delegating to CloseSessionCoreAsync. If a subscriber reattached between the
|
||||||
|
// IsLeaseExpired/IsDetachGraceExpired check above and here, TryBeginCloseIfExpired
|
||||||
|
// returns false and we skip this session (it is no longer expired).
|
||||||
|
if (!session.TryBeginCloseIfExpired(now, out bool alreadyClosing) && !alreadyClosing)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
await CloseSessionCoreAsync(session, reason, cancellationToken).ConfigureAwait(false);
|
await CloseSessionCoreAsync(session, reason, cancellationToken).ConfigureAwait(false);
|
||||||
closedCount++;
|
closedCount++;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -577,6 +577,43 @@ public sealed class GatewaySessionTests
|
|||||||
Assert.True(session.IsDetachGraceExpired(clock.GetUtcNow()));
|
Assert.True(session.IsDetachGraceExpired(clock.GetUtcNow()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Task 11. Validates the TOCTOU fix: TryBeginCloseIfExpired atomically re-checks that no
|
||||||
|
/// subscriber has reattached before flipping to Closing. When the grace window has elapsed but
|
||||||
|
/// a subscriber is attached by the time TryBeginCloseIfExpired runs, it returns false and the
|
||||||
|
/// session remains Ready.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task TryBeginCloseIfExpired_ReattachedSubscriberWinsRace_DeclinesClose()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new();
|
||||||
|
FakeTimeProvider clock = new(DateTimeOffset.UtcNow);
|
||||||
|
await using GatewaySession session = CreateReadySessionWithDetachGrace(
|
||||||
|
workerClient,
|
||||||
|
clock,
|
||||||
|
detachGrace: TimeSpan.FromSeconds(30));
|
||||||
|
|
||||||
|
// Attach then drop to enter detach-grace, then advance past the window.
|
||||||
|
IDisposable firstSubscriber = session.AttachEventSubscriber(maxSubscribers: 1);
|
||||||
|
firstSubscriber.Dispose();
|
||||||
|
clock.Advance(TimeSpan.FromSeconds(31));
|
||||||
|
DateTimeOffset expiredNow = clock.GetUtcNow();
|
||||||
|
Assert.True(session.IsDetachGraceExpired(expiredNow)); // sanity: would be closed by sweep
|
||||||
|
|
||||||
|
// Simulate a client reconnecting before the sweeper calls TryBeginCloseIfExpired.
|
||||||
|
// The reattach clears _detachedAtUtc and increments _activeEventSubscriberCount so
|
||||||
|
// neither expiry condition holds any longer.
|
||||||
|
using IDisposable reconnected = session.AttachEventSubscriber(maxSubscribers: 1);
|
||||||
|
Assert.Null(session.DetachedAtUtc);
|
||||||
|
|
||||||
|
// TryBeginCloseIfExpired must see the reattach and decline — the session stays Ready.
|
||||||
|
bool began = session.TryBeginCloseIfExpired(expiredNow, out bool alreadyClosing);
|
||||||
|
|
||||||
|
Assert.False(began);
|
||||||
|
Assert.False(alreadyClosing);
|
||||||
|
Assert.Equal(SessionState.Ready, session.State);
|
||||||
|
}
|
||||||
|
|
||||||
private static GatewaySession CreateReadySessionWithDetachGrace(
|
private static GatewaySession CreateReadySessionWithDetachGrace(
|
||||||
IWorkerClient workerClient,
|
IWorkerClient workerClient,
|
||||||
TimeProvider timeProvider,
|
TimeProvider timeProvider,
|
||||||
|
|||||||
@@ -789,6 +789,47 @@ public sealed class SessionManagerTests
|
|||||||
Assert.Equal(1, workerClient.ShutdownCount);
|
Assert.Equal(1, workerClient.ShutdownCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Task 11. TOCTOU race: a session whose detach-grace window has expired but that
|
||||||
|
/// reattaches an external subscriber before the sweeper calls CloseSessionCoreAsync is
|
||||||
|
/// NOT closed — it remains Ready and usable. This validates that TryBeginCloseIfExpired
|
||||||
|
/// re-checks eligibility atomically so a reconnect that wins the race cancels the close.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task CloseExpiredLeasesAsync_DoesNotCloseSessionThatReattachedBeforeSweepCloses()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new();
|
||||||
|
FakeTimeProvider clock = new(DateTimeOffset.UtcNow);
|
||||||
|
SessionManager manager = CreateManager(
|
||||||
|
new FakeSessionWorkerClientFactory(workerClient),
|
||||||
|
options: CreateOptions(defaultLeaseSeconds: 1800, detachGraceSeconds: 30),
|
||||||
|
timeProvider: clock);
|
||||||
|
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||||
|
|
||||||
|
// Attach and drop an external subscriber so the session enters detach-grace.
|
||||||
|
IDisposable firstSubscriber = session.AttachEventSubscriber(maxSubscribers: 1);
|
||||||
|
firstSubscriber.Dispose();
|
||||||
|
Assert.NotNull(session.DetachedAtUtc);
|
||||||
|
|
||||||
|
// Advance past the grace window so IsDetachGraceExpired returns true.
|
||||||
|
clock.Advance(TimeSpan.FromSeconds(31));
|
||||||
|
DateTimeOffset sweepTime = clock.GetUtcNow();
|
||||||
|
|
||||||
|
// Simulate a client reattaching before the sweep actually closes the session.
|
||||||
|
// The reattach clears _detachedAtUtc and increments _activeEventSubscriberCount,
|
||||||
|
// so TryBeginCloseIfExpired will see neither condition as met and decline.
|
||||||
|
using IDisposable reconnectedSubscriber = session.AttachEventSubscriber(maxSubscribers: 1);
|
||||||
|
Assert.Null(session.DetachedAtUtc);
|
||||||
|
|
||||||
|
// The sweep runs with the timestamp that was past the grace window, but since the
|
||||||
|
// subscriber has reattached, the session must NOT be closed.
|
||||||
|
int closedCount = await manager.CloseExpiredLeasesAsync(sweepTime, CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(0, closedCount);
|
||||||
|
Assert.Equal(SessionState.Ready, session.State);
|
||||||
|
Assert.Equal(0, workerClient.ShutdownCount);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Verifies that shutdown closes all registered sessions.</summary>
|
/// <summary>Verifies that shutdown closes all registered sessions.</summary>
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task ShutdownAsync_ClosesAllRegisteredSessions()
|
public async Task ShutdownAsync_ClosesAllRegisteredSessions()
|
||||||
|
|||||||
Reference in New Issue
Block a user