fix(sessions): close SessionEventDistributor dispose/register races + add overflow logging

This commit is contained in:
Joseph Doherty
2026-06-15 12:37:39 -04:00
parent c79b292968
commit 7773bdebbd
2 changed files with 81 additions and 27 deletions
@@ -43,9 +43,17 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// </remarks> /// </remarks>
public sealed class SessionEventDistributor : IAsyncDisposable public sealed class SessionEventDistributor : IAsyncDisposable
{ {
/// <summary>
/// Bounded wait for the pump to stop during disposal. A source factory that
/// ignores cancellation must not hang dispose forever; after this window the
/// pump is abandoned and subscribers are completed anyway.
/// </summary>
private static readonly TimeSpan DefaultShutdownTimeout = TimeSpan.FromSeconds(5);
private readonly string _sessionId; private readonly string _sessionId;
private readonly Func<CancellationToken, IAsyncEnumerable<MxEvent>> _eventSourceFactory; private readonly Func<CancellationToken, IAsyncEnumerable<MxEvent>> _eventSourceFactory;
private readonly int _subscriberQueueCapacity; private readonly int _subscriberQueueCapacity;
private readonly TimeSpan _shutdownTimeout;
private readonly ILogger<SessionEventDistributor> _logger; private readonly ILogger<SessionEventDistributor> _logger;
private readonly ConcurrentDictionary<long, Subscriber> _subscribers = new(); private readonly ConcurrentDictionary<long, Subscriber> _subscribers = new();
private readonly CancellationTokenSource _shutdownCts = new(); private readonly CancellationTokenSource _shutdownCts = new();
@@ -84,6 +92,7 @@ public sealed class SessionEventDistributor : IAsyncDisposable
_sessionId = sessionId; _sessionId = sessionId;
_eventSourceFactory = eventSourceFactory; _eventSourceFactory = eventSourceFactory;
_subscriberQueueCapacity = subscriberQueueCapacity; _subscriberQueueCapacity = subscriberQueueCapacity;
_shutdownTimeout = DefaultShutdownTimeout;
_logger = logger; _logger = logger;
} }
@@ -123,14 +132,15 @@ public sealed class SessionEventDistributor : IAsyncDisposable
/// </summary> /// </summary>
public IEventSubscriberLease Register() public IEventSubscriberLease Register()
{ {
lock (_lifecycleLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
}
// The pump is the single writer for this channel; readers are single-consumer // The pump is the single writer for this channel; readers are single-consumer
// (one gRPC stream / dashboard subscriber). Synchronous continuations are // (one gRPC stream / dashboard subscriber). Synchronous continuations are
// disabled so a slow reader can never stall the pump on its completion. // disabled so a slow reader can never stall the pump on its completion.
//
// FullMode is Wait but the pump currently writes with TryWrite (drop-on-full):
// these are deliberately opposite policies and only a placeholder. Task 5 owns
// the overflow policy and will reconcile them by either switching the pump to
// WriteAsync (true backpressure, honouring Wait) or changing this to a Drop mode.
// Do not "fix" the mismatch here — leave the decision to Task 5.
Channel<MxEvent> channel = Channel.CreateBounded<MxEvent>( Channel<MxEvent> channel = Channel.CreateBounded<MxEvent>(
new BoundedChannelOptions(_subscriberQueueCapacity) new BoundedChannelOptions(_subscriberQueueCapacity)
{ {
@@ -142,10 +152,18 @@ public sealed class SessionEventDistributor : IAsyncDisposable
long id = Interlocked.Increment(ref _nextSubscriberId); long id = Interlocked.Increment(ref _nextSubscriberId);
Subscriber subscriber = new(id, channel); Subscriber subscriber = new(id, channel);
_subscribers[id] = subscriber;
// Disposal between the add and a concurrent DisposeAsync: DisposeAsync drains // The disposed check AND the map add happen under the same lock with no await
// and completes every subscriber currently in the map, so this entry is covered. // in between. DisposeAsync sets _disposed=true under this same lock before it
// calls CompleteAllSubscribers, so once disposal has begun no further subscriber
// can be added — closing the Register-after-DisposeAsync window that would
// otherwise leave a subscriber's channel never completed.
lock (_lifecycleLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
_subscribers[id] = subscriber;
}
return new SubscriberLease(this, subscriber); return new SubscriberLease(this, subscriber);
} }
@@ -172,20 +190,34 @@ public sealed class SessionEventDistributor : IAsyncDisposable
if (pumpTask is not null) if (pumpTask is not null)
{ {
try // Bound the wait: a source factory that ignores cancellation would otherwise
// hang dispose forever. If the pump does not stop in time we log and proceed
// to complete subscribers anyway; DisposeAsync must not throw on this path.
Task completed = await Task.WhenAny(pumpTask, Task.Delay(_shutdownTimeout)).ConfigureAwait(false);
if (!ReferenceEquals(completed, pumpTask))
{ {
await pumpTask.ConfigureAwait(false); _logger.LogWarning(
} "Event distributor pump did not stop within {ShutdownTimeoutSeconds}s for session {SessionId}; completing subscribers and abandoning the pump.",
catch (OperationCanceledException) _shutdownTimeout.TotalSeconds,
{
}
catch (Exception exception)
{
_logger.LogDebug(
exception,
"Event distributor pump faulted during shutdown for session {SessionId}.",
_sessionId); _sessionId);
} }
else
{
try
{
await pumpTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.LogDebug(
exception,
"Event distributor pump faulted during shutdown for session {SessionId}.",
_sessionId);
}
}
} }
CompleteAllSubscribers(error: null); CompleteAllSubscribers(error: null);
@@ -209,7 +241,17 @@ public sealed class SessionEventDistributor : IAsyncDisposable
// drop / disconnect / fault that one subscriber). For the Task 2 // drop / disconnect / fault that one subscriber). For the Task 2
// skeleton, a non-blocking TryWrite that silently drops on a full // skeleton, a non-blocking TryWrite that silently drops on a full
// channel is the placeholder so one slow reader never stalls the pump. // channel is the placeholder so one slow reader never stalls the pump.
subscriber.Channel.Writer.TryWrite(mxEvent); if (!subscriber.Channel.Writer.TryWrite(mxEvent))
{
// Visibility only — Task 5 owns the actual drop/backpressure policy.
// Logs identifiers (worker sequence, subscriber id, session) only,
// never the event payload or tag values.
_logger.LogDebug(
"Event distributor dropped event (worker sequence {WorkerSequence}) for subscriber {SubscriberId} in session {SessionId}: channel full.",
mxEvent.WorkerSequence,
subscriber.Id,
_sessionId);
}
} }
} }
@@ -221,7 +263,9 @@ public sealed class SessionEventDistributor : IAsyncDisposable
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.LogDebug( // Unexpected source fault (not the shutdown-cancellation path above) — visible
// by default so an event stream silently dying is not lost in Debug noise.
_logger.LogError(
exception, exception,
"Event distributor source faulted for session {SessionId}.", "Event distributor source faulted for session {SessionId}.",
_sessionId); _sessionId);
@@ -255,19 +299,17 @@ public sealed class SessionEventDistributor : IAsyncDisposable
private sealed class SubscriberLease(SessionEventDistributor distributor, Subscriber subscriber) private sealed class SubscriberLease(SessionEventDistributor distributor, Subscriber subscriber)
: IEventSubscriberLease : IEventSubscriberLease
{ {
private bool _disposed; private int _leaseDisposed;
public ChannelReader<MxEvent> Reader => subscriber.Channel.Reader; public ChannelReader<MxEvent> Reader => subscriber.Channel.Reader;
public void Dispose() public void Dispose()
{ {
if (_disposed) // Atomic check-and-set so concurrent Dispose calls unregister at most once.
if (Interlocked.Exchange(ref _leaseDisposed, 1) == 0)
{ {
return; distributor.Unregister(subscriber);
} }
_disposed = true;
distributor.Unregister(subscriber);
} }
} }
} }
@@ -100,6 +100,18 @@ public sealed class SessionEventDistributorTests
await AssertCompletedAsync(leaseB.Reader); await AssertCompletedAsync(leaseB.Reader);
} }
[Fact]
public async Task Register_AfterDispose_ThrowsObjectDisposedException()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
SessionEventDistributor distributor = CreateDistributor(source.Reader);
await distributor.StartAsync(CancellationToken.None);
await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
Assert.Throws<ObjectDisposedException>(() => distributor.Register());
}
private static SessionEventDistributor CreateDistributor(ChannelReader<MxEvent> source) private static SessionEventDistributor CreateDistributor(ChannelReader<MxEvent> source)
=> new( => new(
"session-test", "session-test",