From 7773bdebbdbf67187e74a3f76883e3fe4dd5d8c6 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 12:37:39 -0400 Subject: [PATCH] fix(sessions): close SessionEventDistributor dispose/register races + add overflow logging --- .../Sessions/SessionEventDistributor.cs | 96 +++++++++++++------ .../Sessions/SessionEventDistributorTests.cs | 12 +++ 2 files changed, 81 insertions(+), 27 deletions(-) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs index f4f1d60..038a510 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs @@ -43,9 +43,17 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions; /// public sealed class SessionEventDistributor : IAsyncDisposable { + /// + /// Bounded wait for the pump to stop during disposal. A source factory that + /// ignores cancellation must not hang dispose forever; after this window the + /// pump is abandoned and subscribers are completed anyway. + /// + private static readonly TimeSpan DefaultShutdownTimeout = TimeSpan.FromSeconds(5); + private readonly string _sessionId; private readonly Func> _eventSourceFactory; private readonly int _subscriberQueueCapacity; + private readonly TimeSpan _shutdownTimeout; private readonly ILogger _logger; private readonly ConcurrentDictionary _subscribers = new(); private readonly CancellationTokenSource _shutdownCts = new(); @@ -84,6 +92,7 @@ public sealed class SessionEventDistributor : IAsyncDisposable _sessionId = sessionId; _eventSourceFactory = eventSourceFactory; _subscriberQueueCapacity = subscriberQueueCapacity; + _shutdownTimeout = DefaultShutdownTimeout; _logger = logger; } @@ -123,14 +132,15 @@ public sealed class SessionEventDistributor : IAsyncDisposable /// public IEventSubscriberLease Register() { - lock (_lifecycleLock) - { - ObjectDisposedException.ThrowIf(_disposed, this); - } - // The pump is the single writer for this channel; readers are single-consumer // (one gRPC stream / dashboard subscriber). Synchronous continuations are // disabled so a slow reader can never stall the pump on its completion. + // + // FullMode is Wait but the pump currently writes with TryWrite (drop-on-full): + // these are deliberately opposite policies and only a placeholder. Task 5 owns + // the overflow policy and will reconcile them by either switching the pump to + // WriteAsync (true backpressure, honouring Wait) or changing this to a Drop mode. + // Do not "fix" the mismatch here — leave the decision to Task 5. Channel channel = Channel.CreateBounded( new BoundedChannelOptions(_subscriberQueueCapacity) { @@ -142,10 +152,18 @@ public sealed class SessionEventDistributor : IAsyncDisposable long id = Interlocked.Increment(ref _nextSubscriberId); Subscriber subscriber = new(id, channel); - _subscribers[id] = subscriber; - // Disposal between the add and a concurrent DisposeAsync: DisposeAsync drains - // and completes every subscriber currently in the map, so this entry is covered. + // The disposed check AND the map add happen under the same lock with no await + // in between. DisposeAsync sets _disposed=true under this same lock before it + // calls CompleteAllSubscribers, so once disposal has begun no further subscriber + // can be added — closing the Register-after-DisposeAsync window that would + // otherwise leave a subscriber's channel never completed. + lock (_lifecycleLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + _subscribers[id] = subscriber; + } + return new SubscriberLease(this, subscriber); } @@ -172,20 +190,34 @@ public sealed class SessionEventDistributor : IAsyncDisposable if (pumpTask is not null) { - try + // Bound the wait: a source factory that ignores cancellation would otherwise + // hang dispose forever. If the pump does not stop in time we log and proceed + // to complete subscribers anyway; DisposeAsync must not throw on this path. + Task completed = await Task.WhenAny(pumpTask, Task.Delay(_shutdownTimeout)).ConfigureAwait(false); + if (!ReferenceEquals(completed, pumpTask)) { - await pumpTask.ConfigureAwait(false); - } - catch (OperationCanceledException) - { - } - catch (Exception exception) - { - _logger.LogDebug( - exception, - "Event distributor pump faulted during shutdown for session {SessionId}.", + _logger.LogWarning( + "Event distributor pump did not stop within {ShutdownTimeoutSeconds}s for session {SessionId}; completing subscribers and abandoning the pump.", + _shutdownTimeout.TotalSeconds, _sessionId); } + else + { + try + { + await pumpTask.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + catch (Exception exception) + { + _logger.LogDebug( + exception, + "Event distributor pump faulted during shutdown for session {SessionId}.", + _sessionId); + } + } } CompleteAllSubscribers(error: null); @@ -209,7 +241,17 @@ public sealed class SessionEventDistributor : IAsyncDisposable // drop / disconnect / fault that one subscriber). For the Task 2 // skeleton, a non-blocking TryWrite that silently drops on a full // channel is the placeholder so one slow reader never stalls the pump. - subscriber.Channel.Writer.TryWrite(mxEvent); + if (!subscriber.Channel.Writer.TryWrite(mxEvent)) + { + // Visibility only — Task 5 owns the actual drop/backpressure policy. + // Logs identifiers (worker sequence, subscriber id, session) only, + // never the event payload or tag values. + _logger.LogDebug( + "Event distributor dropped event (worker sequence {WorkerSequence}) for subscriber {SubscriberId} in session {SessionId}: channel full.", + mxEvent.WorkerSequence, + subscriber.Id, + _sessionId); + } } } @@ -221,7 +263,9 @@ public sealed class SessionEventDistributor : IAsyncDisposable } catch (Exception exception) { - _logger.LogDebug( + // Unexpected source fault (not the shutdown-cancellation path above) — visible + // by default so an event stream silently dying is not lost in Debug noise. + _logger.LogError( exception, "Event distributor source faulted for session {SessionId}.", _sessionId); @@ -255,19 +299,17 @@ public sealed class SessionEventDistributor : IAsyncDisposable private sealed class SubscriberLease(SessionEventDistributor distributor, Subscriber subscriber) : IEventSubscriberLease { - private bool _disposed; + private int _leaseDisposed; public ChannelReader Reader => subscriber.Channel.Reader; public void Dispose() { - if (_disposed) + // Atomic check-and-set so concurrent Dispose calls unregister at most once. + if (Interlocked.Exchange(ref _leaseDisposed, 1) == 0) { - return; + distributor.Unregister(subscriber); } - - _disposed = true; - distributor.Unregister(subscriber); } } } diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs index 05d596a..8842d1b 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs @@ -100,6 +100,18 @@ public sealed class SessionEventDistributorTests await AssertCompletedAsync(leaseB.Reader); } + [Fact] + public async Task Register_AfterDispose_ThrowsObjectDisposedException() + { + Channel source = Channel.CreateUnbounded(); + SessionEventDistributor distributor = CreateDistributor(source.Reader); + await distributor.StartAsync(CancellationToken.None); + + await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout); + + Assert.Throws(() => distributor.Register()); + } + private static SessionEventDistributor CreateDistributor(ChannelReader source) => new( "session-test",