fix(dashboard): close StartDashboardMirror/DisposeAsync race; internal-overflow test + metric label

(1) GatewaySession.StartDashboardMirror: publish _dashboardMirrorLease and _dashboardMirrorTask
    atomically under one _syncRoot section; if the session is already Closing/Closed/Faulted,
    dispose the just-created lease and return without starting the mirror task so nothing is orphaned.
(2) WaitUntilAsync test helper: catch OperationCanceledException and call Assert.Fail with the
    timeout duration and predicate source text instead of letting the exception propagate raw.
(3) New SessionEventDistributorTests.InternalSubscriberOverflow_HandlerSeesIsOnlySubscriberFalse:
    verifies CountExternalSubscribers excludes the internal subscriber, so isOnlySubscriber==false
    even when the internal subscriber is the only registered subscriber.
(4) SubscriberOverflowHandler delegate gains isInternal parameter; overflow metric label is
    "dashboard-mirror" for internal subscribers and "grpc-event-stream" for external ones.
(5) DashboardEventBroadcaster.Publish: wrap SendAsync Task acquisition in try/catch so a
    synchronous throw cannot escape the never-throw Publish interface contract.
This commit is contained in:
Joseph Doherty
2026-06-15 15:02:36 -04:00
parent 1ea08c3b10
commit 2ead9bc200
5 changed files with 137 additions and 21 deletions
@@ -24,9 +24,21 @@ public sealed class DashboardEventBroadcaster(
return; return;
} }
Task send = hubContext.Clients // Wrap the Task acquisition in a try/catch so a hypothetical synchronous throw
.Group(EventsHub.GroupName(sessionId)) // from SendAsync (e.g. an implementation that throws before returning the Task)
.SendAsync(EventsHub.EventMessage, mxEvent); // cannot escape Publish. The interface contract is never-throw; fire-and-forget.
Task send;
try
{
send = hubContext.Clients
.Group(EventsHub.GroupName(sessionId))
.SendAsync(EventsHub.EventMessage, mxEvent);
}
catch (Exception ex)
{
logger.LogDebug(ex, "Dashboard event mirror to session {SessionId} threw synchronously.", sessionId);
return;
}
if (!send.IsCompletedSuccessfully) if (!send.IsCompletedSuccessfully)
{ {
@@ -454,6 +454,14 @@ public sealed class GatewaySession
// a subscriber is always present at pump start — the dashboard receives events with no // a subscriber is always present at pump start — the dashboard receives events with no
// gRPC subscriber attached, and the Task 4 "zero-subscriber drain into the void" hang // gRPC subscriber attached, and the Task 4 "zero-subscriber drain into the void" hang
// cannot occur. No-op when no dashboard broadcaster was supplied (unit tests). // cannot occur. No-op when no dashboard broadcaster was supplied (unit tests).
//
// Race-safety (Issue 1): _dashboardMirrorLease and _dashboardMirrorTask are published
// atomically under a SINGLE second lock section, and DisposeAsync reads/nulls them under
// that same lock. After EnsureDistributorCreated/Register/StartPump (all outside _syncRoot
// to avoid lock inversion with the distributor's own lifecycle lock), we re-enter
// _syncRoot and check for concurrent disposal. If the session is already Closing/Closed/
// Faulted at that point, we dispose the just-created lease immediately and do NOT start
// the mirror task, so nothing is orphaned.
private void StartDashboardMirror() private void StartDashboardMirror()
{ {
IDashboardEventBroadcaster? broadcaster = _eventStreaming.DashboardBroadcaster; IDashboardEventBroadcaster? broadcaster = _eventStreaming.DashboardBroadcaster;
@@ -462,7 +470,6 @@ public sealed class GatewaySession
return; return;
} }
SessionEventDistributor distributor;
CancellationToken loopToken; CancellationToken loopToken;
lock (_syncRoot) lock (_syncRoot)
{ {
@@ -480,18 +487,31 @@ public sealed class GatewaySession
// internal subscriber BEFORE starting the pump. isInternal: true keeps the dashboard // internal subscriber BEFORE starting the pump. isInternal: true keeps the dashboard
// subscriber out of the single-subscriber overflow accounting, so a slow/broken // subscriber out of the single-subscriber overflow accounting, so a slow/broken
// dashboard mirror only disconnects itself and never faults the session. // dashboard mirror only disconnects itself and never faults the session.
distributor = EnsureDistributorCreated(out bool startNow); // These three calls are OUTSIDE _syncRoot to avoid holding it across
// EnsureDistributorCreated's own lock and StartAsync's Task.Run.
SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow);
IEventSubscriberLease lease = distributor.Register(isInternal: true); IEventSubscriberLease lease = distributor.Register(isInternal: true);
StartPumpIfRequested(distributor, startNow); StartPumpIfRequested(distributor, startNow);
// Publish BOTH the lease and the task atomically under one lock section so
// DisposeAsync always sees them in a consistent state: either both are set or
// both are null. If the session already started disposal before we got here,
// dispose the lease immediately instead of orphaning it.
lock (_syncRoot) lock (_syncRoot)
{ {
_dashboardMirrorLease = lease; if (_state is SessionState.Closing or SessionState.Closed or SessionState.Faulted)
} {
// Disposal already ran (or is in progress) — discard the just-created
// lease now so it is not orphaned. Do NOT launch the mirror task.
lease.Dispose();
return;
}
_dashboardMirrorTask = Task.Run( _dashboardMirrorLease = lease;
() => RunDashboardMirrorAsync(broadcaster, lease, loopToken), _dashboardMirrorTask = Task.Run(
CancellationToken.None); () => RunDashboardMirrorAsync(broadcaster, lease, loopToken),
CancellationToken.None);
}
} }
// Reads the internal dashboard subscriber's channel and publishes each RAW fanned event // Reads the internal dashboard subscriber's channel and publishes each RAW fanned event
@@ -556,7 +576,7 @@ public sealed class GatewaySession
// offending subscriber with an EventQueueOverflow fault; this handler adds the // offending subscriber with an EventQueueOverflow fault; this handler adds the
// observable side effects, preserving exactly what the pre-epic per-RPC overflow path // observable side effects, preserving exactly what the pre-epic per-RPC overflow path
// emitted: // emitted:
// - always record the queue-overflow metric; // - always record the queue-overflow metric, labeled by subscriber kind;
// - FailFast in the legacy single-subscriber case (isOnlySubscriber): fault the whole // - FailFast in the legacy single-subscriber case (isOnlySubscriber): fault the whole
// session and record the fault metric, matching back-compat behavior; // session and record the fault metric, matching back-compat behavior;
// - FailFast with multiple subscribers, or DisconnectSubscriber in any case: do NOT // - FailFast with multiple subscribers, or DisconnectSubscriber in any case: do NOT
@@ -564,16 +584,23 @@ public sealed class GatewaySession
// whole remedy, so other subscribers and the pump are unaffected. Multi-subscriber // whole remedy, so other subscribers and the pump are unaffected. Multi-subscriber
// FailFast deliberately degrades to a disconnect because faulting a shared session on // FailFast deliberately degrades to a disconnect because faulting a shared session on
// one slow consumer would punish healthy subscribers. // one slow consumer would punish healthy subscribers.
// The delegate now carries isInternal directly (Issue 4), so the metric label is chosen
// without any heuristic: "dashboard-mirror" for internal, "grpc-event-stream" for external.
private SubscriberOverflowHandler CreateOverflowHandler(EventBackpressurePolicy policy) private SubscriberOverflowHandler CreateOverflowHandler(EventBackpressurePolicy policy)
{ {
GatewayMetrics metrics = _eventStreaming.Metrics; GatewayMetrics metrics = _eventStreaming.Metrics;
return isOnlySubscriber => string sessionId = SessionId;
return (isOnlySubscriber, isInternal) =>
{ {
metrics.QueueOverflow("grpc-event-stream"); // Label the overflow metric by subscriber kind. The distributor passes isInternal
// directly, so no heuristic is needed to distinguish an internal overflow (the
// gateway-owned dashboard mirror) from an external one (a gRPC streaming client).
string label = isInternal ? "dashboard-mirror" : "grpc-event-stream";
metrics.QueueOverflow(label);
if (policy == EventBackpressurePolicy.FailFast && isOnlySubscriber) if (policy == EventBackpressurePolicy.FailFast && isOnlySubscriber)
{ {
MarkFaulted($"Session {SessionId} event stream queue overflowed."); MarkFaulted($"Session {sessionId} event stream queue overflowed.");
metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString()); metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
} }
}; };
@@ -17,8 +17,15 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// subscriber at the moment of overflow (legacy single-subscriber mode). FailFast faults /// subscriber at the moment of overflow (legacy single-subscriber mode). FailFast faults
/// the session only in this case; with multiple subscribers FailFast degrades to a /// the session only in this case; with multiple subscribers FailFast degrades to a
/// per-subscriber disconnect so one slow consumer never faults a session shared by others. /// per-subscriber disconnect so one slow consumer never faults a session shared by others.
/// Always <see langword="false"/> for internal subscribers (the dashboard mirror) because
/// <see cref="SessionEventDistributor"/> excludes them from the external-subscriber count.
/// </param> /// </param>
public delegate void SubscriberOverflowHandler(bool isOnlySubscriber); /// <param name="isInternal">
/// <see langword="true"/> when the overflowing subscriber is the gateway-owned internal
/// dashboard mirror subscriber. The handler uses this to choose the correct metric label
/// (<c>"dashboard-mirror"</c> vs <c>"grpc-event-stream"</c>).
/// </param>
public delegate void SubscriberOverflowHandler(bool isOnlySubscriber, bool isInternal);
/// <summary> /// <summary>
/// Per-session event pump and fan-out. A single background task drains the /// Per-session event pump and fan-out. A single background task drains the
@@ -440,9 +447,10 @@ public sealed class SessionEventDistributor : IAsyncDisposable
// Observability + session-fault decision. Errors here must not stall the pump or // Observability + session-fault decision. Errors here must not stall the pump or
// leave the subscriber attached, so the disconnect below runs regardless. // leave the subscriber attached, so the disconnect below runs regardless.
// Pass subscriber.IsInternal so the handler can choose the correct metric label.
try try
{ {
_overflowHandler?.Invoke(isOnlySubscriber); _overflowHandler?.Invoke(isOnlySubscriber, subscriber.IsInternal);
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -204,12 +204,19 @@ public sealed class GatewaySessionDashboardMirrorTests
return new WorkerEvent { Event = mxEvent }; return new WorkerEvent { Event = mxEvent };
} }
private static async Task WaitUntilAsync(Func<bool> predicate) private static async Task WaitUntilAsync(Func<bool> predicate, [CallerArgumentExpression(nameof(predicate))] string? condition = null)
{ {
using CancellationTokenSource cancellationTokenSource = new(TestTimeout); using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
while (!predicate()) try
{ {
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token); while (!predicate())
{
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
}
}
catch (OperationCanceledException)
{
Assert.Fail($"Timed out after {TestTimeout.TotalSeconds}s waiting for: {condition}");
} }
} }
@@ -350,7 +350,7 @@ public sealed class SessionEventDistributorTests
replayRetentionSeconds: 0, replayRetentionSeconds: 0,
NullLogger<SessionEventDistributor>.Instance, NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System, TimeProvider.System,
isOnlySubscriber => (isOnlySubscriber, _) =>
{ {
Interlocked.Increment(ref overflowCalls); Interlocked.Increment(ref overflowCalls);
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber); Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
@@ -415,7 +415,7 @@ public sealed class SessionEventDistributorTests
replayRetentionSeconds: 0, replayRetentionSeconds: 0,
NullLogger<SessionEventDistributor>.Instance, NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System, TimeProvider.System,
isOnlySubscriber => (isOnlySubscriber, _) =>
{ {
if (!isOnlySubscriber) if (!isOnlySubscriber)
{ {
@@ -458,6 +458,68 @@ public sealed class SessionEventDistributorTests
Assert.Equal(11ul, afterOverflow.WorkerSequence); Assert.Equal(11ul, afterOverflow.WorkerSequence);
} }
[Fact]
public async Task InternalSubscriberOverflow_HandlerSeesIsOnlySubscriberFalse_ProvingCountExcludesInternal()
{
// Issue 3: verifies that CountExternalSubscribers() excludes the internal dashboard
// subscriber, so a FailFast policy would NOT fault the session even when the internal
// subscriber is the ONLY registered subscriber. The overflow handler receives
// isOnlySubscriber==false (not true) because the overflowing subscriber is internal
// and is therefore excluded from the external-subscriber count.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
int observedIsOnlySubscriberSet = 0;
bool observedIsOnlySubscriberValue = false;
bool observedIsInternalValue = false;
await using SessionEventDistributor distributor = new(
"session-internal-overflow",
ct => source.Reader.ReadAllAsync(ct),
subscriberQueueCapacity: 2,
replayBufferCapacity: 0,
replayRetentionSeconds: 0,
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
(isOnlySubscriber, isInternal) =>
{
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
Volatile.Write(ref observedIsInternalValue, isInternal);
Volatile.Write(ref observedIsOnlySubscriberSet, 1);
});
await distributor.StartAsync(CancellationToken.None);
// Register ONLY an internal subscriber — no external subscriber is attached.
using IEventSubscriberLease internalLease = distributor.Register(isInternal: true);
// Push enough events to overflow the capacity-2 internal subscriber channel.
for (ulong sequence = 1; sequence <= 10; sequence++)
{
source.Writer.TryWrite(Event(sequence));
}
// The internal subscriber is disconnected with the overflow fault.
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
async () => await DrainUntilFaultAsync(internalLease.Reader));
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
// Wait for the handler to fire (it runs on the pump thread).
await Task.Run(async () =>
{
using CancellationTokenSource cts = new(ReadTimeout);
while (Volatile.Read(ref observedIsOnlySubscriberSet) == 0)
{
await Task.Delay(10, cts.Token);
}
});
// isOnlySubscriber must be FALSE even though the internal subscriber was the ONLY
// subscriber — CountExternalSubscribers excludes it, so a FailFast policy on the
// external count would NOT fault the session.
Assert.True(Volatile.Read(ref observedIsOnlySubscriberSet) == 1, "Overflow handler should have fired.");
Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue),
"isOnlySubscriber must be false for an internal subscriber (CountExternalSubscribers excludes it).");
Assert.True(Volatile.Read(ref observedIsInternalValue),
"isInternal must be true for a subscriber registered with isInternal: true.");
}
private static async Task DrainUntilFaultAsync(ChannelReader<MxEvent> reader) private static async Task DrainUntilFaultAsync(ChannelReader<MxEvent> reader)
{ {
// Drains any buffered events, then surfaces the channel's completion fault (if any) // Drains any buffered events, then surfaces the channel's completion fault (if any)