From 2ead9bc2008059aa6dcdf80185f98688032d1423 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 15:02:36 -0400 Subject: [PATCH] fix(dashboard): close StartDashboardMirror/DisposeAsync race; internal-overflow test + metric label (1) GatewaySession.StartDashboardMirror: publish _dashboardMirrorLease and _dashboardMirrorTask atomically under one _syncRoot section; if the session is already Closing/Closed/Faulted, dispose the just-created lease and return without starting the mirror task so nothing is orphaned. (2) WaitUntilAsync test helper: catch OperationCanceledException and call Assert.Fail with the timeout duration and predicate source text instead of letting the exception propagate raw. (3) New SessionEventDistributorTests.InternalSubscriberOverflow_HandlerSeesIsOnlySubscriberFalse: verifies CountExternalSubscribers excludes the internal subscriber, so isOnlySubscriber==false even when the internal subscriber is the only registered subscriber. (4) SubscriberOverflowHandler delegate gains isInternal parameter; overflow metric label is "dashboard-mirror" for internal subscribers and "grpc-event-stream" for external ones. (5) DashboardEventBroadcaster.Publish: wrap SendAsync Task acquisition in try/catch so a synchronous throw cannot escape the never-throw Publish interface contract. --- .../Hubs/DashboardEventBroadcaster.cs | 18 ++++- .../Sessions/GatewaySession.cs | 49 ++++++++++---- .../Sessions/SessionEventDistributor.cs | 12 +++- .../GatewaySessionDashboardMirrorTests.cs | 13 +++- .../Sessions/SessionEventDistributorTests.cs | 66 ++++++++++++++++++- 5 files changed, 137 insertions(+), 21 deletions(-) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardEventBroadcaster.cs b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardEventBroadcaster.cs index 2e63b08..ae75937 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardEventBroadcaster.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardEventBroadcaster.cs @@ -24,9 +24,21 @@ public sealed class DashboardEventBroadcaster( return; } - Task send = hubContext.Clients - .Group(EventsHub.GroupName(sessionId)) - .SendAsync(EventsHub.EventMessage, mxEvent); + // Wrap the Task acquisition in a try/catch so a hypothetical synchronous throw + // from SendAsync (e.g. an implementation that throws before returning the Task) + // cannot escape Publish. The interface contract is never-throw; fire-and-forget. + Task send; + try + { + send = hubContext.Clients + .Group(EventsHub.GroupName(sessionId)) + .SendAsync(EventsHub.EventMessage, mxEvent); + } + catch (Exception ex) + { + logger.LogDebug(ex, "Dashboard event mirror to session {SessionId} threw synchronously.", sessionId); + return; + } if (!send.IsCompletedSuccessfully) { diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs index c20b624..086f2df 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs @@ -454,6 +454,14 @@ public sealed class GatewaySession // a subscriber is always present at pump start — the dashboard receives events with no // gRPC subscriber attached, and the Task 4 "zero-subscriber drain into the void" hang // cannot occur. No-op when no dashboard broadcaster was supplied (unit tests). + // + // Race-safety (Issue 1): _dashboardMirrorLease and _dashboardMirrorTask are published + // atomically under a SINGLE second lock section, and DisposeAsync reads/nulls them under + // that same lock. After EnsureDistributorCreated/Register/StartPump (all outside _syncRoot + // to avoid lock inversion with the distributor's own lifecycle lock), we re-enter + // _syncRoot and check for concurrent disposal. If the session is already Closing/Closed/ + // Faulted at that point, we dispose the just-created lease immediately and do NOT start + // the mirror task, so nothing is orphaned. private void StartDashboardMirror() { IDashboardEventBroadcaster? broadcaster = _eventStreaming.DashboardBroadcaster; @@ -462,7 +470,6 @@ public sealed class GatewaySession return; } - SessionEventDistributor distributor; CancellationToken loopToken; lock (_syncRoot) { @@ -480,18 +487,31 @@ public sealed class GatewaySession // internal subscriber BEFORE starting the pump. isInternal: true keeps the dashboard // subscriber out of the single-subscriber overflow accounting, so a slow/broken // dashboard mirror only disconnects itself and never faults the session. - distributor = EnsureDistributorCreated(out bool startNow); + // These three calls are OUTSIDE _syncRoot to avoid holding it across + // EnsureDistributorCreated's own lock and StartAsync's Task.Run. + SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow); IEventSubscriberLease lease = distributor.Register(isInternal: true); StartPumpIfRequested(distributor, startNow); + // Publish BOTH the lease and the task atomically under one lock section so + // DisposeAsync always sees them in a consistent state: either both are set or + // both are null. If the session already started disposal before we got here, + // dispose the lease immediately instead of orphaning it. lock (_syncRoot) { - _dashboardMirrorLease = lease; - } + if (_state is SessionState.Closing or SessionState.Closed or SessionState.Faulted) + { + // Disposal already ran (or is in progress) — discard the just-created + // lease now so it is not orphaned. Do NOT launch the mirror task. + lease.Dispose(); + return; + } - _dashboardMirrorTask = Task.Run( - () => RunDashboardMirrorAsync(broadcaster, lease, loopToken), - CancellationToken.None); + _dashboardMirrorLease = lease; + _dashboardMirrorTask = Task.Run( + () => RunDashboardMirrorAsync(broadcaster, lease, loopToken), + CancellationToken.None); + } } // Reads the internal dashboard subscriber's channel and publishes each RAW fanned event @@ -556,7 +576,7 @@ public sealed class GatewaySession // offending subscriber with an EventQueueOverflow fault; this handler adds the // observable side effects, preserving exactly what the pre-epic per-RPC overflow path // emitted: - // - always record the queue-overflow metric; + // - always record the queue-overflow metric, labeled by subscriber kind; // - FailFast in the legacy single-subscriber case (isOnlySubscriber): fault the whole // session and record the fault metric, matching back-compat behavior; // - FailFast with multiple subscribers, or DisconnectSubscriber in any case: do NOT @@ -564,16 +584,23 @@ public sealed class GatewaySession // whole remedy, so other subscribers and the pump are unaffected. Multi-subscriber // FailFast deliberately degrades to a disconnect because faulting a shared session on // one slow consumer would punish healthy subscribers. + // The delegate now carries isInternal directly (Issue 4), so the metric label is chosen + // without any heuristic: "dashboard-mirror" for internal, "grpc-event-stream" for external. private SubscriberOverflowHandler CreateOverflowHandler(EventBackpressurePolicy policy) { GatewayMetrics metrics = _eventStreaming.Metrics; - return isOnlySubscriber => + string sessionId = SessionId; + return (isOnlySubscriber, isInternal) => { - metrics.QueueOverflow("grpc-event-stream"); + // Label the overflow metric by subscriber kind. The distributor passes isInternal + // directly, so no heuristic is needed to distinguish an internal overflow (the + // gateway-owned dashboard mirror) from an external one (a gRPC streaming client). + string label = isInternal ? "dashboard-mirror" : "grpc-event-stream"; + metrics.QueueOverflow(label); if (policy == EventBackpressurePolicy.FailFast && isOnlySubscriber) { - MarkFaulted($"Session {SessionId} event stream queue overflowed."); + MarkFaulted($"Session {sessionId} event stream queue overflowed."); metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString()); } }; diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs index 6d90d32..dd79de8 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs @@ -17,8 +17,15 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions; /// subscriber at the moment of overflow (legacy single-subscriber mode). FailFast faults /// the session only in this case; with multiple subscribers FailFast degrades to a /// per-subscriber disconnect so one slow consumer never faults a session shared by others. +/// Always for internal subscribers (the dashboard mirror) because +/// excludes them from the external-subscriber count. /// -public delegate void SubscriberOverflowHandler(bool isOnlySubscriber); +/// +/// when the overflowing subscriber is the gateway-owned internal +/// dashboard mirror subscriber. The handler uses this to choose the correct metric label +/// ("dashboard-mirror" vs "grpc-event-stream"). +/// +public delegate void SubscriberOverflowHandler(bool isOnlySubscriber, bool isInternal); /// /// Per-session event pump and fan-out. A single background task drains the @@ -440,9 +447,10 @@ public sealed class SessionEventDistributor : IAsyncDisposable // Observability + session-fault decision. Errors here must not stall the pump or // leave the subscriber attached, so the disconnect below runs regardless. + // Pass subscriber.IsInternal so the handler can choose the correct metric label. try { - _overflowHandler?.Invoke(isOnlySubscriber); + _overflowHandler?.Invoke(isOnlySubscriber, subscriber.IsInternal); } catch (Exception exception) { diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs index 327ff3b..b458791 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs @@ -204,12 +204,19 @@ public sealed class GatewaySessionDashboardMirrorTests return new WorkerEvent { Event = mxEvent }; } - private static async Task WaitUntilAsync(Func predicate) + private static async Task WaitUntilAsync(Func predicate, [CallerArgumentExpression(nameof(predicate))] string? condition = null) { using CancellationTokenSource cancellationTokenSource = new(TestTimeout); - while (!predicate()) + try { - await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token); + while (!predicate()) + { + await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token); + } + } + catch (OperationCanceledException) + { + Assert.Fail($"Timed out after {TestTimeout.TotalSeconds}s waiting for: {condition}"); } } diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs index 835f19f..58ca40b 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs @@ -350,7 +350,7 @@ public sealed class SessionEventDistributorTests replayRetentionSeconds: 0, NullLogger.Instance, TimeProvider.System, - isOnlySubscriber => + (isOnlySubscriber, _) => { Interlocked.Increment(ref overflowCalls); Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber); @@ -415,7 +415,7 @@ public sealed class SessionEventDistributorTests replayRetentionSeconds: 0, NullLogger.Instance, TimeProvider.System, - isOnlySubscriber => + (isOnlySubscriber, _) => { if (!isOnlySubscriber) { @@ -458,6 +458,68 @@ public sealed class SessionEventDistributorTests Assert.Equal(11ul, afterOverflow.WorkerSequence); } + [Fact] + public async Task InternalSubscriberOverflow_HandlerSeesIsOnlySubscriberFalse_ProvingCountExcludesInternal() + { + // Issue 3: verifies that CountExternalSubscribers() excludes the internal dashboard + // subscriber, so a FailFast policy would NOT fault the session even when the internal + // subscriber is the ONLY registered subscriber. The overflow handler receives + // isOnlySubscriber==false (not true) because the overflowing subscriber is internal + // and is therefore excluded from the external-subscriber count. + Channel source = Channel.CreateUnbounded(); + int observedIsOnlySubscriberSet = 0; + bool observedIsOnlySubscriberValue = false; + bool observedIsInternalValue = false; + await using SessionEventDistributor distributor = new( + "session-internal-overflow", + ct => source.Reader.ReadAllAsync(ct), + subscriberQueueCapacity: 2, + replayBufferCapacity: 0, + replayRetentionSeconds: 0, + NullLogger.Instance, + TimeProvider.System, + (isOnlySubscriber, isInternal) => + { + Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber); + Volatile.Write(ref observedIsInternalValue, isInternal); + Volatile.Write(ref observedIsOnlySubscriberSet, 1); + }); + await distributor.StartAsync(CancellationToken.None); + + // Register ONLY an internal subscriber — no external subscriber is attached. + using IEventSubscriberLease internalLease = distributor.Register(isInternal: true); + + // Push enough events to overflow the capacity-2 internal subscriber channel. + for (ulong sequence = 1; sequence <= 10; sequence++) + { + source.Writer.TryWrite(Event(sequence)); + } + + // The internal subscriber is disconnected with the overflow fault. + SessionManagerException fault = await Assert.ThrowsAsync( + async () => await DrainUntilFaultAsync(internalLease.Reader)); + Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode); + + // Wait for the handler to fire (it runs on the pump thread). + await Task.Run(async () => + { + using CancellationTokenSource cts = new(ReadTimeout); + while (Volatile.Read(ref observedIsOnlySubscriberSet) == 0) + { + await Task.Delay(10, cts.Token); + } + }); + + // isOnlySubscriber must be FALSE even though the internal subscriber was the ONLY + // subscriber — CountExternalSubscribers excludes it, so a FailFast policy on the + // external count would NOT fault the session. + Assert.True(Volatile.Read(ref observedIsOnlySubscriberSet) == 1, "Overflow handler should have fired."); + Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue), + "isOnlySubscriber must be false for an internal subscriber (CountExternalSubscribers excludes it)."); + Assert.True(Volatile.Read(ref observedIsInternalValue), + "isInternal must be true for a subscriber registered with isInternal: true."); + } + private static async Task DrainUntilFaultAsync(ChannelReader reader) { // Drains any buffered events, then surfaces the channel's completion fault (if any)