diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
index f4f1d60..038a510 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
@@ -43,9 +43,17 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
///
public sealed class SessionEventDistributor : IAsyncDisposable
{
+ ///
+ /// Bounded wait for the pump to stop during disposal. A source factory that
+ /// ignores cancellation must not hang dispose forever; after this window the
+ /// pump is abandoned and subscribers are completed anyway.
+ ///
+ private static readonly TimeSpan DefaultShutdownTimeout = TimeSpan.FromSeconds(5);
+
private readonly string _sessionId;
private readonly Func> _eventSourceFactory;
private readonly int _subscriberQueueCapacity;
+ private readonly TimeSpan _shutdownTimeout;
private readonly ILogger _logger;
private readonly ConcurrentDictionary _subscribers = new();
private readonly CancellationTokenSource _shutdownCts = new();
@@ -84,6 +92,7 @@ public sealed class SessionEventDistributor : IAsyncDisposable
_sessionId = sessionId;
_eventSourceFactory = eventSourceFactory;
_subscriberQueueCapacity = subscriberQueueCapacity;
+ _shutdownTimeout = DefaultShutdownTimeout;
_logger = logger;
}
@@ -123,14 +132,15 @@ public sealed class SessionEventDistributor : IAsyncDisposable
///
public IEventSubscriberLease Register()
{
- lock (_lifecycleLock)
- {
- ObjectDisposedException.ThrowIf(_disposed, this);
- }
-
// The pump is the single writer for this channel; readers are single-consumer
// (one gRPC stream / dashboard subscriber). Synchronous continuations are
// disabled so a slow reader can never stall the pump on its completion.
+ //
+ // FullMode is Wait but the pump currently writes with TryWrite (drop-on-full):
+ // these are deliberately opposite policies and only a placeholder. Task 5 owns
+ // the overflow policy and will reconcile them by either switching the pump to
+ // WriteAsync (true backpressure, honouring Wait) or changing this to a Drop mode.
+ // Do not "fix" the mismatch here — leave the decision to Task 5.
Channel channel = Channel.CreateBounded(
new BoundedChannelOptions(_subscriberQueueCapacity)
{
@@ -142,10 +152,18 @@ public sealed class SessionEventDistributor : IAsyncDisposable
long id = Interlocked.Increment(ref _nextSubscriberId);
Subscriber subscriber = new(id, channel);
- _subscribers[id] = subscriber;
- // Disposal between the add and a concurrent DisposeAsync: DisposeAsync drains
- // and completes every subscriber currently in the map, so this entry is covered.
+ // The disposed check AND the map add happen under the same lock with no await
+ // in between. DisposeAsync sets _disposed=true under this same lock before it
+ // calls CompleteAllSubscribers, so once disposal has begun no further subscriber
+ // can be added — closing the Register-after-DisposeAsync window that would
+ // otherwise leave a subscriber's channel never completed.
+ lock (_lifecycleLock)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+ _subscribers[id] = subscriber;
+ }
+
return new SubscriberLease(this, subscriber);
}
@@ -172,20 +190,34 @@ public sealed class SessionEventDistributor : IAsyncDisposable
if (pumpTask is not null)
{
- try
+ // Bound the wait: a source factory that ignores cancellation would otherwise
+ // hang dispose forever. If the pump does not stop in time we log and proceed
+ // to complete subscribers anyway; DisposeAsync must not throw on this path.
+ Task completed = await Task.WhenAny(pumpTask, Task.Delay(_shutdownTimeout)).ConfigureAwait(false);
+ if (!ReferenceEquals(completed, pumpTask))
{
- await pumpTask.ConfigureAwait(false);
- }
- catch (OperationCanceledException)
- {
- }
- catch (Exception exception)
- {
- _logger.LogDebug(
- exception,
- "Event distributor pump faulted during shutdown for session {SessionId}.",
+ _logger.LogWarning(
+ "Event distributor pump did not stop within {ShutdownTimeoutSeconds}s for session {SessionId}; completing subscribers and abandoning the pump.",
+ _shutdownTimeout.TotalSeconds,
_sessionId);
}
+ else
+ {
+ try
+ {
+ await pumpTask.ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ catch (Exception exception)
+ {
+ _logger.LogDebug(
+ exception,
+ "Event distributor pump faulted during shutdown for session {SessionId}.",
+ _sessionId);
+ }
+ }
}
CompleteAllSubscribers(error: null);
@@ -209,7 +241,17 @@ public sealed class SessionEventDistributor : IAsyncDisposable
// drop / disconnect / fault that one subscriber). For the Task 2
// skeleton, a non-blocking TryWrite that silently drops on a full
// channel is the placeholder so one slow reader never stalls the pump.
- subscriber.Channel.Writer.TryWrite(mxEvent);
+ if (!subscriber.Channel.Writer.TryWrite(mxEvent))
+ {
+ // Visibility only — Task 5 owns the actual drop/backpressure policy.
+ // Logs identifiers (worker sequence, subscriber id, session) only,
+ // never the event payload or tag values.
+ _logger.LogDebug(
+ "Event distributor dropped event (worker sequence {WorkerSequence}) for subscriber {SubscriberId} in session {SessionId}: channel full.",
+ mxEvent.WorkerSequence,
+ subscriber.Id,
+ _sessionId);
+ }
}
}
@@ -221,7 +263,9 @@ public sealed class SessionEventDistributor : IAsyncDisposable
}
catch (Exception exception)
{
- _logger.LogDebug(
+ // Unexpected source fault (not the shutdown-cancellation path above) — visible
+ // by default so an event stream silently dying is not lost in Debug noise.
+ _logger.LogError(
exception,
"Event distributor source faulted for session {SessionId}.",
_sessionId);
@@ -255,19 +299,17 @@ public sealed class SessionEventDistributor : IAsyncDisposable
private sealed class SubscriberLease(SessionEventDistributor distributor, Subscriber subscriber)
: IEventSubscriberLease
{
- private bool _disposed;
+ private int _leaseDisposed;
public ChannelReader Reader => subscriber.Channel.Reader;
public void Dispose()
{
- if (_disposed)
+ // Atomic check-and-set so concurrent Dispose calls unregister at most once.
+ if (Interlocked.Exchange(ref _leaseDisposed, 1) == 0)
{
- return;
+ distributor.Unregister(subscriber);
}
-
- _disposed = true;
- distributor.Unregister(subscriber);
}
}
}
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
index 05d596a..8842d1b 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
@@ -100,6 +100,18 @@ public sealed class SessionEventDistributorTests
await AssertCompletedAsync(leaseB.Reader);
}
+ [Fact]
+ public async Task Register_AfterDispose_ThrowsObjectDisposedException()
+ {
+ Channel source = Channel.CreateUnbounded();
+ SessionEventDistributor distributor = CreateDistributor(source.Reader);
+ await distributor.StartAsync(CancellationToken.None);
+
+ await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
+
+ Assert.Throws(() => distributor.Register());
+ }
+
private static SessionEventDistributor CreateDistributor(ChannelReader source)
=> new(
"session-test",