diff --git a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardEventBroadcaster.cs b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardEventBroadcaster.cs
index 2e63b08..ae75937 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardEventBroadcaster.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardEventBroadcaster.cs
@@ -24,9 +24,21 @@ public sealed class DashboardEventBroadcaster(
return;
}
- Task send = hubContext.Clients
- .Group(EventsHub.GroupName(sessionId))
- .SendAsync(EventsHub.EventMessage, mxEvent);
+ // Wrap the Task acquisition in a try/catch so a hypothetical synchronous throw
+ // from SendAsync (e.g. an implementation that throws before returning the Task)
+ // cannot escape Publish. The interface contract is never-throw; fire-and-forget.
+ Task send;
+ try
+ {
+ send = hubContext.Clients
+ .Group(EventsHub.GroupName(sessionId))
+ .SendAsync(EventsHub.EventMessage, mxEvent);
+ }
+ catch (Exception ex)
+ {
+ logger.LogDebug(ex, "Dashboard event mirror to session {SessionId} threw synchronously.", sessionId);
+ return;
+ }
if (!send.IsCompletedSuccessfully)
{
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
index c20b624..086f2df 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
@@ -454,6 +454,14 @@ public sealed class GatewaySession
// a subscriber is always present at pump start — the dashboard receives events with no
// gRPC subscriber attached, and the Task 4 "zero-subscriber drain into the void" hang
// cannot occur. No-op when no dashboard broadcaster was supplied (unit tests).
+ //
+ // Race-safety (Issue 1): _dashboardMirrorLease and _dashboardMirrorTask are published
+ // atomically under a SINGLE second lock section, and DisposeAsync reads/nulls them under
+ // that same lock. After EnsureDistributorCreated/Register/StartPump (all outside _syncRoot
+ // to avoid lock inversion with the distributor's own lifecycle lock), we re-enter
+ // _syncRoot and check for concurrent disposal. If the session is already Closing/Closed/
+ // Faulted at that point, we dispose the just-created lease immediately and do NOT start
+ // the mirror task, so nothing is orphaned.
private void StartDashboardMirror()
{
IDashboardEventBroadcaster? broadcaster = _eventStreaming.DashboardBroadcaster;
@@ -462,7 +470,6 @@ public sealed class GatewaySession
return;
}
- SessionEventDistributor distributor;
CancellationToken loopToken;
lock (_syncRoot)
{
@@ -480,18 +487,31 @@ public sealed class GatewaySession
// internal subscriber BEFORE starting the pump. isInternal: true keeps the dashboard
// subscriber out of the single-subscriber overflow accounting, so a slow/broken
// dashboard mirror only disconnects itself and never faults the session.
- distributor = EnsureDistributorCreated(out bool startNow);
+ // These three calls are OUTSIDE _syncRoot to avoid holding it across
+ // EnsureDistributorCreated's own lock and StartAsync's Task.Run.
+ SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow);
IEventSubscriberLease lease = distributor.Register(isInternal: true);
StartPumpIfRequested(distributor, startNow);
+ // Publish BOTH the lease and the task atomically under one lock section so
+ // DisposeAsync always sees them in a consistent state: either both are set or
+ // both are null. If the session already started disposal before we got here,
+ // dispose the lease immediately instead of orphaning it.
lock (_syncRoot)
{
- _dashboardMirrorLease = lease;
- }
+ if (_state is SessionState.Closing or SessionState.Closed or SessionState.Faulted)
+ {
+ // Disposal already ran (or is in progress) — discard the just-created
+ // lease now so it is not orphaned. Do NOT launch the mirror task.
+ lease.Dispose();
+ return;
+ }
- _dashboardMirrorTask = Task.Run(
- () => RunDashboardMirrorAsync(broadcaster, lease, loopToken),
- CancellationToken.None);
+ _dashboardMirrorLease = lease;
+ _dashboardMirrorTask = Task.Run(
+ () => RunDashboardMirrorAsync(broadcaster, lease, loopToken),
+ CancellationToken.None);
+ }
}
// Reads the internal dashboard subscriber's channel and publishes each RAW fanned event
@@ -556,7 +576,7 @@ public sealed class GatewaySession
// offending subscriber with an EventQueueOverflow fault; this handler adds the
// observable side effects, preserving exactly what the pre-epic per-RPC overflow path
// emitted:
- // - always record the queue-overflow metric;
+ // - always record the queue-overflow metric, labeled by subscriber kind;
// - FailFast in the legacy single-subscriber case (isOnlySubscriber): fault the whole
// session and record the fault metric, matching back-compat behavior;
// - FailFast with multiple subscribers, or DisconnectSubscriber in any case: do NOT
@@ -564,16 +584,23 @@ public sealed class GatewaySession
// whole remedy, so other subscribers and the pump are unaffected. Multi-subscriber
// FailFast deliberately degrades to a disconnect because faulting a shared session on
// one slow consumer would punish healthy subscribers.
+ // The delegate now carries isInternal directly (Issue 4), so the metric label is chosen
+ // without any heuristic: "dashboard-mirror" for internal, "grpc-event-stream" for external.
private SubscriberOverflowHandler CreateOverflowHandler(EventBackpressurePolicy policy)
{
GatewayMetrics metrics = _eventStreaming.Metrics;
- return isOnlySubscriber =>
+ string sessionId = SessionId;
+ return (isOnlySubscriber, isInternal) =>
{
- metrics.QueueOverflow("grpc-event-stream");
+ // Label the overflow metric by subscriber kind. The distributor passes isInternal
+ // directly, so no heuristic is needed to distinguish an internal overflow (the
+ // gateway-owned dashboard mirror) from an external one (a gRPC streaming client).
+ string label = isInternal ? "dashboard-mirror" : "grpc-event-stream";
+ metrics.QueueOverflow(label);
if (policy == EventBackpressurePolicy.FailFast && isOnlySubscriber)
{
- MarkFaulted($"Session {SessionId} event stream queue overflowed.");
+ MarkFaulted($"Session {sessionId} event stream queue overflowed.");
metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
}
};
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
index 6d90d32..dd79de8 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
@@ -17,8 +17,15 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// subscriber at the moment of overflow (legacy single-subscriber mode). FailFast faults
/// the session only in this case; with multiple subscribers FailFast degrades to a
/// per-subscriber disconnect so one slow consumer never faults a session shared by others.
+/// Always for internal subscribers (the dashboard mirror) because
+/// excludes them from the external-subscriber count.
///
-public delegate void SubscriberOverflowHandler(bool isOnlySubscriber);
+///
+/// when the overflowing subscriber is the gateway-owned internal
+/// dashboard mirror subscriber. The handler uses this to choose the correct metric label
+/// ("dashboard-mirror" vs "grpc-event-stream").
+///
+public delegate void SubscriberOverflowHandler(bool isOnlySubscriber, bool isInternal);
///
/// Per-session event pump and fan-out. A single background task drains the
@@ -440,9 +447,10 @@ public sealed class SessionEventDistributor : IAsyncDisposable
// Observability + session-fault decision. Errors here must not stall the pump or
// leave the subscriber attached, so the disconnect below runs regardless.
+ // Pass subscriber.IsInternal so the handler can choose the correct metric label.
try
{
- _overflowHandler?.Invoke(isOnlySubscriber);
+ _overflowHandler?.Invoke(isOnlySubscriber, subscriber.IsInternal);
}
catch (Exception exception)
{
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs
index 327ff3b..b458791 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs
@@ -204,12 +204,19 @@ public sealed class GatewaySessionDashboardMirrorTests
return new WorkerEvent { Event = mxEvent };
}
- private static async Task WaitUntilAsync(Func predicate)
+ private static async Task WaitUntilAsync(Func predicate, [CallerArgumentExpression(nameof(predicate))] string? condition = null)
{
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
- while (!predicate())
+ try
{
- await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
+ while (!predicate())
+ {
+ await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ Assert.Fail($"Timed out after {TestTimeout.TotalSeconds}s waiting for: {condition}");
}
}
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
index 835f19f..58ca40b 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
@@ -350,7 +350,7 @@ public sealed class SessionEventDistributorTests
replayRetentionSeconds: 0,
NullLogger.Instance,
TimeProvider.System,
- isOnlySubscriber =>
+ (isOnlySubscriber, _) =>
{
Interlocked.Increment(ref overflowCalls);
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
@@ -415,7 +415,7 @@ public sealed class SessionEventDistributorTests
replayRetentionSeconds: 0,
NullLogger.Instance,
TimeProvider.System,
- isOnlySubscriber =>
+ (isOnlySubscriber, _) =>
{
if (!isOnlySubscriber)
{
@@ -458,6 +458,68 @@ public sealed class SessionEventDistributorTests
Assert.Equal(11ul, afterOverflow.WorkerSequence);
}
+ [Fact]
+ public async Task InternalSubscriberOverflow_HandlerSeesIsOnlySubscriberFalse_ProvingCountExcludesInternal()
+ {
+ // Issue 3: verifies that CountExternalSubscribers() excludes the internal dashboard
+ // subscriber, so a FailFast policy would NOT fault the session even when the internal
+ // subscriber is the ONLY registered subscriber. The overflow handler receives
+ // isOnlySubscriber==false (not true) because the overflowing subscriber is internal
+ // and is therefore excluded from the external-subscriber count.
+ Channel source = Channel.CreateUnbounded();
+ int observedIsOnlySubscriberSet = 0;
+ bool observedIsOnlySubscriberValue = false;
+ bool observedIsInternalValue = false;
+ await using SessionEventDistributor distributor = new(
+ "session-internal-overflow",
+ ct => source.Reader.ReadAllAsync(ct),
+ subscriberQueueCapacity: 2,
+ replayBufferCapacity: 0,
+ replayRetentionSeconds: 0,
+ NullLogger.Instance,
+ TimeProvider.System,
+ (isOnlySubscriber, isInternal) =>
+ {
+ Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
+ Volatile.Write(ref observedIsInternalValue, isInternal);
+ Volatile.Write(ref observedIsOnlySubscriberSet, 1);
+ });
+ await distributor.StartAsync(CancellationToken.None);
+
+ // Register ONLY an internal subscriber — no external subscriber is attached.
+ using IEventSubscriberLease internalLease = distributor.Register(isInternal: true);
+
+ // Push enough events to overflow the capacity-2 internal subscriber channel.
+ for (ulong sequence = 1; sequence <= 10; sequence++)
+ {
+ source.Writer.TryWrite(Event(sequence));
+ }
+
+ // The internal subscriber is disconnected with the overflow fault.
+ SessionManagerException fault = await Assert.ThrowsAsync(
+ async () => await DrainUntilFaultAsync(internalLease.Reader));
+ Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
+
+ // Wait for the handler to fire (it runs on the pump thread).
+ await Task.Run(async () =>
+ {
+ using CancellationTokenSource cts = new(ReadTimeout);
+ while (Volatile.Read(ref observedIsOnlySubscriberSet) == 0)
+ {
+ await Task.Delay(10, cts.Token);
+ }
+ });
+
+ // isOnlySubscriber must be FALSE even though the internal subscriber was the ONLY
+ // subscriber — CountExternalSubscribers excludes it, so a FailFast policy on the
+ // external count would NOT fault the session.
+ Assert.True(Volatile.Read(ref observedIsOnlySubscriberSet) == 1, "Overflow handler should have fired.");
+ Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue),
+ "isOnlySubscriber must be false for an internal subscriber (CountExternalSubscribers excludes it).");
+ Assert.True(Volatile.Read(ref observedIsInternalValue),
+ "isInternal must be true for a subscriber registered with isInternal: true.");
+ }
+
private static async Task DrainUntilFaultAsync(ChannelReader reader)
{
// Drains any buffered events, then surfaces the channel's completion fault (if any)