diff --git a/docs/Sessions.md b/docs/Sessions.md
index e5b8534..59d381a 100644
--- a/docs/Sessions.md
+++ b/docs/Sessions.md
@@ -150,7 +150,8 @@ public sealed record SessionCloseResult(
|------|---------|
| `SessionNotFound` | The session id is not in the registry. |
| `SessionNotReady` | The session or its `IWorkerClient` is not in `Ready` state. |
-| `EventSubscriberAlreadyActive` | A second event subscriber attached when only one is allowed. |
+| `EventSubscriberAlreadyActive` | A second event subscriber attached in single-subscriber mode (`AllowMultipleEventSubscribers` is `false`). |
+| `EventSubscriberLimitReached` | In multi-subscriber mode, an attach exceeded `MaxEventSubscribersPerSession` concurrent external subscribers. |
| `EventQueueOverflow` | Reserved for the worker event channel overflow path. |
| `SessionLimitExceeded` | `MaxSessions` is in use. |
| `OpenFailed` | `OpenSessionAsync` failed; the inner exception carries the cause. |
@@ -192,7 +193,9 @@ The order — fault, deregister, dispose, release slot, record metric, log, reth
While `Ready`, callers reach the worker through `SessionManager.InvokeAsync` or `ReadEventsAsync`. Both delegate to `GatewaySession`, which checks the state under lock and updates `LastClientActivityAt` on every invocation. `GatewaySession` also exposes typed bulk helpers (`AddItemBulkAsync`, `SubscribeBulkAsync`, etc.) that wrap `WorkerCommand` round-trips and translate non-`Ok` `ProtocolStatus` replies into `SessionManagerException` with `SessionNotReady`.
-Event streaming uses `AttachEventSubscriber` which returns a disposable lease. When `allowMultipleSubscribers` is false the second attach throws `EventSubscriberAlreadyActive`; this prevents two gRPC streams from racing on the same worker event channel. Active event subscribers keep the session lease from expiring until the stream is disposed.
+Event streaming uses `AttachEventSubscriber` which returns a disposable lease. When `allowMultipleSubscribers` is false (single-subscriber mode) a second attach throws `EventSubscriberAlreadyActive`; this prevents two gRPC streams from racing on the same worker event channel. When it is true, up to `MaxEventSubscribersPerSession` concurrent external subscribers are allowed and the next attach throws `EventSubscriberLimitReached`. The count-check-and-increment is atomic under the session lock, so concurrent attaches can never exceed the cap. The gateway-owned internal dashboard mirror subscriber is registered directly on the distributor and does not count toward the cap. Active event subscribers keep the session lease from expiring until the stream is disposed.
+
+`FailFast` event backpressure faults the whole session only in single-subscriber mode; in multi-subscriber mode it degrades to a per-subscriber disconnect so one slow consumer never faults a session shared by others. The session passes its mode to the `SessionEventDistributor` at construction, so this decision is made on the fixed mode rather than a live subscriber-count snapshot.
Sessions open with `MxGateway:Sessions:DefaultLeaseSeconds` (default 1800) added to the open timestamp. Unary client activity refreshes the lease by the same duration. `ExtendLease` and `IsLeaseExpired` cooperate with `SessionManager.CloseExpiredLeasesAsync`, which iterates a registry snapshot and closes any session whose lease has expired with `LeaseExpiredReason`. `SessionLeaseMonitorHostedService` runs that sweep every `MxGateway:Sessions:LeaseSweepIntervalSeconds` seconds (default 30).
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs
index 6dfab3f..a3a8e06 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs
@@ -69,7 +69,8 @@ public sealed class EventStreamService(
// block below, which also disposes the reader. A `using` declaration would add a
// second Dispose on the same path and double-decrement the session subscriber count.
IEventSubscriberLease subscriber = session.AttachEventSubscriber(
- options.Value.Sessions.AllowMultipleEventSubscribers);
+ options.Value.Sessions.AllowMultipleEventSubscribers,
+ options.Value.Sessions.MaxEventSubscribersPerSession);
int streamQueueDepth = 0;
ulong afterWorkerSequence = request.AfterWorkerSequence;
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Grpc/MxAccessGatewayService.cs b/src/ZB.MOM.WW.MxGateway.Server/Grpc/MxAccessGatewayService.cs
index a365260..e88fbae 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Grpc/MxAccessGatewayService.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Grpc/MxAccessGatewayService.cs
@@ -949,6 +949,7 @@ public sealed class MxAccessGatewayService(
SessionManagerErrorCode.SessionNotFound => StatusCode.NotFound,
SessionManagerErrorCode.SessionNotReady => StatusCode.FailedPrecondition,
SessionManagerErrorCode.EventSubscriberAlreadyActive => StatusCode.ResourceExhausted,
+ SessionManagerErrorCode.EventSubscriberLimitReached => StatusCode.ResourceExhausted,
SessionManagerErrorCode.EventQueueOverflow => StatusCode.ResourceExhausted,
SessionManagerErrorCode.SessionLimitExceeded => StatusCode.ResourceExhausted,
SessionManagerErrorCode.OpenFailed => StatusCode.Unavailable,
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
index 086f2df..d00a4b4 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
@@ -419,7 +419,8 @@ public sealed class GatewaySession
eventOptions.ReplayRetentionSeconds,
_eventStreaming.DistributorLogger,
_eventStreaming.TimeProvider,
- CreateOverflowHandler(eventOptions.BackpressurePolicy));
+ CreateOverflowHandler(eventOptions.BackpressurePolicy),
+ singleSubscriberMode: !_eventStreaming.AllowMultipleEventSubscribers);
}
startNow = false;
@@ -680,14 +681,36 @@ public sealed class GatewaySession
///
/// Attaches an event subscriber and returns a lease whose
/// reads the fanned public
- /// s for this subscriber. The single-subscriber guard
- /// (Tasks 7/8 relax it) is unchanged: with multi-subscriber disabled a second
- /// attach is rejected. The returned lease, when disposed, unregisters the
- /// distributor subscriber AND decrements the active-subscriber count.
+ /// s for this subscriber. The returned lease, when disposed,
+ /// unregisters the distributor subscriber AND decrements the active-subscriber count.
///
- /// If true, allows multiple concurrent event subscribers.
- public IEventSubscriberLease AttachEventSubscriber(bool allowMultipleSubscribers)
+ ///
+ /// When , single-subscriber mode: a second concurrent EXTERNAL
+ /// subscriber is rejected with .
+ /// When , multi-subscriber mode: up to
+ /// concurrent EXTERNAL subscribers are allowed; the
+ /// next attach is rejected with
+ /// .
+ ///
+ ///
+ /// Maximum concurrent external subscribers in multi-subscriber mode
+ /// (MxGateway:Sessions:MaxEventSubscribersPerSession). Ignored when
+ /// is (the effective
+ /// cap is then 1). The gateway-owned internal dashboard subscriber is registered
+ /// directly on the distributor and is NOT counted here, so it never consumes cap budget.
+ ///
+ ///
+ /// The count-check-and-increment runs atomically under _syncRoot, so two
+ /// concurrent attaches racing toward the cap can never both succeed past it. On
+ /// distributor-register failure the count is rolled back (see the catch below).
+ ///
+ public IEventSubscriberLease AttachEventSubscriber(bool allowMultipleSubscribers, int maxSubscribers)
{
+ // Effective cap: 1 in single-subscriber mode, otherwise the configured maximum
+ // (clamped to at least 1 so a misconfigured non-positive value can never deadlock
+ // attaches in multi-subscriber mode).
+ int effectiveCap = allowMultipleSubscribers ? Math.Max(1, maxSubscribers) : 1;
+
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
@@ -697,11 +720,15 @@ public sealed class GatewaySession
$"Session {SessionId} is not ready for event streaming. Current state is {_state}.");
}
- if (!allowMultipleSubscribers && _activeEventSubscriberCount > 0)
+ if (_activeEventSubscriberCount >= effectiveCap)
{
- throw new SessionManagerException(
- SessionManagerErrorCode.EventSubscriberAlreadyActive,
- $"Session {SessionId} already has an active event stream subscriber.");
+ throw allowMultipleSubscribers
+ ? new SessionManagerException(
+ SessionManagerErrorCode.EventSubscriberLimitReached,
+ $"Session {SessionId} has reached its maximum of {effectiveCap} concurrent event stream subscribers.")
+ : new SessionManagerException(
+ SessionManagerErrorCode.EventSubscriberAlreadyActive,
+ $"Session {SessionId} already has an active event stream subscriber.");
}
_activeEventSubscriberCount++;
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
index dd79de8..71fa5bf 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
@@ -13,12 +13,16 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// regardless of what the handler does.
///
///
-/// when the overflowing subscriber is the sole registered
-/// subscriber at the moment of overflow (legacy single-subscriber mode). FailFast faults
-/// the session only in this case; with multiple subscribers FailFast degrades to a
-/// per-subscriber disconnect so one slow consumer never faults a session shared by others.
-/// Always for internal subscribers (the dashboard mirror) because
-/// excludes them from the external-subscriber count.
+/// when FailFast is allowed to fault the whole session for this
+/// overflow. As of Task 8 this is gated on the SESSION MODE, not a live count: it is
+/// only for an external subscriber in single-subscriber mode
+/// (AllowMultipleEventSubscribers == false), where at most one external subscriber
+/// can ever exist. In multi-subscriber mode it is always , so
+/// FailFast degrades to a per-subscriber disconnect and one slow consumer never faults a
+/// session shared by others; gating on the fixed mode also removes the Task 5 race where a
+/// concurrent registration could make a count snapshot falsely report a sole subscriber.
+/// Always for internal subscribers (the dashboard mirror) so a
+/// slow/broken dashboard can never fault the session.
///
///
/// when the overflowing subscriber is the gateway-owned internal
@@ -40,8 +44,10 @@ public delegate void SubscriberOverflowHandler(bool isOnlySubscriber, bool isInt
/// policy (Task 5) is implemented here: a slow subscriber overflows only its own
/// bounded channel and the pump applies the policy to that subscriber alone (see
/// and OnSubscriberOverflow), leaving
-/// the pump, the session, and other subscribers running. The class does not yet
-/// remove the single-subscriber guard (Tasks 7/8). The ring buffer supports capacity
+/// the pump, the session, and other subscribers running. Task 8 made the
+/// FailFast-faults-session decision mode-gated: it fires only in single-subscriber
+/// mode (singleSubscriberMode), so multi-subscriber FailFast always degrades to
+/// a per-subscriber disconnect — see OnSubscriberOverflow. The ring buffer supports capacity
/// eviction (oldest entry dropped when the count exceeds
/// replayBufferCapacity) and age eviction (entries older than
/// replayRetentionSeconds dropped on the next append or query), and is
@@ -83,6 +89,7 @@ public sealed class SessionEventDistributor : IAsyncDisposable
private readonly string _sessionId;
private readonly Func> _eventSourceFactory;
private readonly int _subscriberQueueCapacity;
+ private readonly bool _singleSubscriberMode;
private readonly SubscriberOverflowHandler? _overflowHandler;
private readonly TimeSpan _shutdownTimeout;
private readonly ILogger _logger;
@@ -134,7 +141,8 @@ public sealed class SessionEventDistributor : IAsyncDisposable
Func> eventSourceFactory,
int subscriberQueueCapacity,
ILogger logger,
- SubscriberOverflowHandler? overflowHandler = null)
+ SubscriberOverflowHandler? overflowHandler = null,
+ bool singleSubscriberMode = true)
: this(
sessionId,
eventSourceFactory,
@@ -143,7 +151,8 @@ public sealed class SessionEventDistributor : IAsyncDisposable
replayRetentionSeconds: 0,
logger,
TimeProvider.System,
- overflowHandler)
+ overflowHandler,
+ singleSubscriberMode)
{
}
@@ -181,6 +190,17 @@ public sealed class SessionEventDistributor : IAsyncDisposable
/// handler. When (unit/skeleton use) the offending subscriber is
/// still disconnected but no metric/fault side effect runs.
///
+ ///
+ /// when the owning session is in single-subscriber mode
+ /// (AllowMultipleEventSubscribers == false). This gates the FailFast
+ /// session-fault decision in OnSubscriberOverflow: an external subscriber that
+ /// overflows reports isOnlySubscriber == true (legacy FailFast faults the
+ /// session) ONLY in single-subscriber mode. In multi-subscriber mode it is always
+ /// , so FailFast degrades to a per-subscriber disconnect and a
+ /// transient registration race can never falsely fault a shared session (Task 8;
+ /// resolves the Task 5 REVISIT race). Defaults to so existing
+ /// call sites and unit tests keep legacy single-subscriber FailFast behavior.
+ ///
public SessionEventDistributor(
string sessionId,
Func> eventSourceFactory,
@@ -189,7 +209,8 @@ public sealed class SessionEventDistributor : IAsyncDisposable
double replayRetentionSeconds,
ILogger logger,
TimeProvider timeProvider,
- SubscriberOverflowHandler? overflowHandler = null)
+ SubscriberOverflowHandler? overflowHandler = null,
+ bool singleSubscriberMode = true)
{
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
ArgumentNullException.ThrowIfNull(eventSourceFactory);
@@ -202,6 +223,7 @@ public sealed class SessionEventDistributor : IAsyncDisposable
_sessionId = sessionId;
_eventSourceFactory = eventSourceFactory;
_subscriberQueueCapacity = subscriberQueueCapacity;
+ _singleSubscriberMode = singleSubscriberMode;
_overflowHandler = overflowHandler;
_shutdownTimeout = DefaultShutdownTimeout;
_replayBufferCapacity = replayBufferCapacity;
@@ -416,28 +438,25 @@ public sealed class SessionEventDistributor : IAsyncDisposable
// slow consumer must not fault a session shared by other healthy subscribers.
private void OnSubscriberOverflow(Subscriber subscriber, ulong workerSequence)
{
- // Snapshot whether this is the sole subscriber BEFORE we unregister it. This drives
- // the FailFast-fault-session-vs-disconnect decision: FailFast only faults the session
- // when the overflowing subscriber is the sole subscriber.
+ // Decide whether FailFast may fault the whole session for this overflow. This is the
+ // "isOnlySubscriber" signal the legacy single-subscriber FailFast path keys on.
//
- // This snapshot is safe in v1 because AllowMultipleEventSubscribers=false is enforced
- // by the validator and the single-subscriber guard in AttachEventSubscriber — a
- // concurrent second registration is impossible, so the false-FailFast race (two
- // subscribers, one overflows, Count reads as 1 after the other concurrently unregisters,
- // FailFast wrongly faults the session) cannot occur today.
+ // Task 8 resolution of the Task 5/7 REVISIT race: gate this on the SESSION MODE
+ // (_singleSubscriberMode), NOT on a live count snapshot. The old
+ // `CountExternalSubscribers() == 1` snapshot raced once multi-subscriber became real —
+ // a concurrent second registration/unregistration could make the count read as 1 with
+ // two subscribers actually present, producing a false FailFast that faults a shared
+ // session. The mode is fixed for the session's lifetime, so reading it is race-free:
+ // - single-subscriber mode: at most one external subscriber can ever exist (the
+ // AttachEventSubscriber guard enforces it), so an overflowing external subscriber
+ // IS the sole subscriber — preserve the legacy FailFast session-fault behavior.
+ // - multi-subscriber mode: never fault the shared session; FailFast degrades to a
+ // per-subscriber disconnect so one slow consumer cannot punish healthy ones.
//
- // REVISIT (Task 7/8): when multi-subscriber is enabled the guard is removed and the
- // race window opens — a concurrent second registration could cause Count to read as 1
- // here even with two subscribers, producing a false FailFast that faults a shared
- // session. Resolve before enabling multi-subscriber.
- //
- // Task 6: the gateway-owned internal dashboard subscriber is excluded from this
- // accounting. (a) An internal subscriber that overflows is NEVER the "only subscriber"
- // — a slow/broken dashboard must never fault the session, only disconnect its own
- // mirror. (b) Internal subscribers are excluded from the count, so a lone external
- // gRPC subscriber still reports isOnlySubscriber==true and preserves the legacy
- // FailFast session-fault behavior even while the dashboard mirror is attached.
- bool isOnlySubscriber = !subscriber.IsInternal && CountExternalSubscribers() == 1;
+ // Task 6: the gateway-owned internal dashboard subscriber is excluded — an internal
+ // subscriber that overflows is NEVER the "only subscriber", so a slow/broken dashboard
+ // can only disconnect its own mirror and never fault the session.
+ bool isOnlySubscriber = !subscriber.IsInternal && _singleSubscriberMode;
_logger.LogDebug(
"Event distributor disconnecting subscriber {SubscriberId} in session {SessionId} after queue overflow (worker sequence {WorkerSequence}).",
@@ -473,22 +492,6 @@ public sealed class SessionEventDistributor : IAsyncDisposable
}
}
- // Counts external (non-internal) subscribers. Drives the isOnlySubscriber FailFast
- // decision so the gateway-owned internal dashboard subscriber never inflates the count.
- private int CountExternalSubscribers()
- {
- int count = 0;
- foreach (Subscriber subscriber in _subscribers.Values)
- {
- if (!subscriber.IsInternal)
- {
- count++;
- }
- }
-
- return count;
- }
-
private void CompleteAllSubscribers(Exception? error)
{
foreach (Subscriber subscriber in _subscribers.Values)
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs
index f7e1120..460bebf 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs
@@ -38,13 +38,24 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// EventsHub group regardless of whether a gRPC client is streaming. When null
/// (unit tests that don't exercise the dashboard mirror) no mirror is started.
///
+///
+/// The session's effective multi-subscriber mode (Task 8). Carried here so the session
+/// can pass it to its at construction — the
+/// distributor is created at MarkReady (for the dashboard mirror) before any gRPC
+/// subscriber attaches, so the mode cannot be learned from a later
+/// AttachEventSubscriber call. The distributor gates its FailFast session-fault
+/// decision on this mode (single-subscriber only) instead of a live count snapshot,
+/// closing the Task 5 false-FailFast race. Defaults to
+/// (single-subscriber) so existing call sites and unit tests are unchanged.
+///
public sealed record SessionEventStreaming(
MxAccessGrpcMapper Mapper,
EventOptions EventOptions,
ILogger DistributorLogger,
TimeProvider TimeProvider,
GatewayMetrics Metrics,
- IDashboardEventBroadcaster? DashboardBroadcaster = null)
+ IDashboardEventBroadcaster? DashboardBroadcaster = null,
+ bool AllowMultipleEventSubscribers = false)
{
///
/// Defaults used when a session is constructed without explicit streaming
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs
index 06cd097..22414e3 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs
@@ -461,7 +461,8 @@ public sealed class SessionManager : ISessionManager
_distributorLogger,
_timeProvider,
_metrics,
- _dashboardEventBroadcaster);
+ _dashboardEventBroadcaster,
+ _options.Sessions.AllowMultipleEventSubscribers);
return new GatewaySession(
sessionId,
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManagerErrorCode.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManagerErrorCode.cs
index 215c9fc..b1b4235 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManagerErrorCode.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManagerErrorCode.cs
@@ -5,6 +5,7 @@ public enum SessionManagerErrorCode
SessionNotFound,
SessionNotReady,
EventSubscriberAlreadyActive,
+ EventSubscriberLimitReached,
EventQueueOverflow,
SessionLimitExceeded,
OpenFailed,
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs
index b458791..0c07034 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs
@@ -151,7 +151,7 @@ public sealed class GatewaySessionDashboardMirrorTests
session.MarkReady();
Assert.Equal(0, session.ActiveEventSubscriberCount);
- using IEventSubscriberLease lease = session.AttachEventSubscriber(allowMultipleSubscribers: false);
+ using IEventSubscriberLease lease = session.AttachEventSubscriber(allowMultipleSubscribers: false, maxSubscribers: 1);
Assert.Equal(1, session.ActiveEventSubscriberCount);
}
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs
index 45363ae..d4bc8a0 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs
@@ -2,10 +2,12 @@ using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
+using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
+using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
@@ -185,7 +187,7 @@ public sealed class GatewaySessionTests
{
// Attach one subscriber; this increments _activeEventSubscriberCount to 1.
IEventSubscriberLease lease = session.AttachEventSubscriber(
- allowMultipleSubscribers: false);
+ allowMultipleSubscribers: false, maxSubscribers: 1);
// Race Concurrency threads all calling Dispose() on the same lease.
// Only one must actually run DetachEventSubscriber.
@@ -211,7 +213,7 @@ public sealed class GatewaySessionTests
// Observable contract: a fresh single subscriber must now be attachable
// (i.e., the guard _activeEventSubscriberCount > 0 is false).
IEventSubscriberLease next = session.AttachEventSubscriber(
- allowMultipleSubscribers: false);
+ allowMultipleSubscribers: false, maxSubscribers: 1);
next.Dispose();
Assert.Equal(0, session.ActiveEventSubscriberCount);
}
@@ -220,6 +222,220 @@ public sealed class GatewaySessionTests
await session.DisposeAsync();
}
+ ///
+ /// Task 8 regression. Single-subscriber mode rejects a SECOND concurrent external
+ /// attach with —
+ /// the legacy guard is preserved unchanged when multi-subscriber is disabled.
+ ///
+ [Fact]
+ public async Task AttachEventSubscriber_SingleMode_SecondAttachThrowsAlreadyActive()
+ {
+ FakeWorkerClient workerClient = new();
+ GatewaySession session = CreateReadySessionWithEventStreaming(workerClient);
+
+ using IEventSubscriberLease first = session.AttachEventSubscriber(
+ allowMultipleSubscribers: false, maxSubscribers: 8);
+
+ SessionManagerException exception = Assert.Throws(
+ () => session.AttachEventSubscriber(allowMultipleSubscribers: false, maxSubscribers: 8));
+ Assert.Equal(SessionManagerErrorCode.EventSubscriberAlreadyActive, exception.ErrorCode);
+ Assert.Equal(1, session.ActiveEventSubscriberCount);
+
+ await session.CloseAsync("test-done", CancellationToken.None);
+ await session.DisposeAsync();
+ }
+
+ ///
+ /// Task 8. Multi-subscriber mode allows exactly cap concurrent external
+ /// subscribers; the (cap+1)-th attach throws
+ /// .
+ ///
+ [Fact]
+ public async Task AttachEventSubscriber_MultiMode_AttachesUpToCapThenThrowsLimitReached()
+ {
+ const int Cap = 4;
+ FakeWorkerClient workerClient = new();
+ GatewaySession session = CreateReadySessionWithEventStreaming(
+ workerClient,
+ allowMultipleEventSubscribers: true);
+
+ List leases = [];
+ for (int i = 0; i < Cap; i++)
+ {
+ leases.Add(session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap));
+ }
+
+ Assert.Equal(Cap, session.ActiveEventSubscriberCount);
+
+ SessionManagerException exception = Assert.Throws(
+ () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap));
+ Assert.Equal(SessionManagerErrorCode.EventSubscriberLimitReached, exception.ErrorCode);
+ Assert.Equal(Cap, session.ActiveEventSubscriberCount);
+
+ foreach (IEventSubscriberLease lease in leases)
+ {
+ lease.Dispose();
+ }
+
+ await session.CloseAsync("test-done", CancellationToken.None);
+ await session.DisposeAsync();
+ }
+
+ ///
+ /// Task 8. The gateway-owned INTERNAL dashboard subscriber must NOT consume cap
+ /// budget: with the dashboard mirror running, the full cap of external subscribers is
+ /// still attachable.
+ ///
+ [Fact]
+ public async Task AttachEventSubscriber_MultiMode_DashboardMirrorDoesNotConsumeCap()
+ {
+ const int Cap = 3;
+ FakeWorkerClient workerClient = new();
+ RecordingDashboardEventBroadcaster broadcaster = new();
+ GatewaySession session = CreateReadySessionWithEventStreaming(
+ workerClient,
+ allowMultipleEventSubscribers: true,
+ dashboardBroadcaster: broadcaster);
+
+ // The internal dashboard mirror registered on MarkReady is NOT counted as an external
+ // subscriber, so the external active count starts at zero.
+ Assert.Equal(0, session.ActiveEventSubscriberCount);
+
+ List leases = [];
+ for (int i = 0; i < Cap; i++)
+ {
+ leases.Add(session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap));
+ }
+
+ Assert.Equal(Cap, session.ActiveEventSubscriberCount);
+
+ // The (cap+1)-th still fails: the dashboard mirror did not eat a slot.
+ SessionManagerException exception = Assert.Throws(
+ () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap));
+ Assert.Equal(SessionManagerErrorCode.EventSubscriberLimitReached, exception.ErrorCode);
+
+ foreach (IEventSubscriberLease lease in leases)
+ {
+ lease.Dispose();
+ }
+
+ await session.CloseAsync("test-done", CancellationToken.None);
+ await session.DisposeAsync();
+ }
+
+ ///
+ /// Task 8 concurrency. Many concurrent attaches in multi-subscriber mode must never
+ /// exceed the cap: the count-check-and-increment is atomic under _syncRoot, so
+ /// exactly cap attaches succeed and the rest throw
+ /// . The observed
+ /// count never goes above the cap.
+ ///
+ [Fact]
+ public async Task AttachEventSubscriber_MultiMode_ConcurrentAttaches_NeverExceedCap()
+ {
+ const int Cap = 5;
+ const int Attempts = 32;
+ TimeSpan testTimeout = TimeSpan.FromSeconds(10);
+
+ FakeWorkerClient workerClient = new();
+ GatewaySession session = CreateReadySessionWithEventStreaming(
+ workerClient,
+ allowMultipleEventSubscribers: true);
+
+ using SemaphoreSlim gate = new(0);
+ int successCount = 0;
+ int limitReachedCount = 0;
+ int maxObservedCount = 0;
+ IEventSubscriberLease?[] leases = new IEventSubscriberLease?[Attempts];
+ Task[] tasks = new Task[Attempts];
+ for (int t = 0; t < Attempts; t++)
+ {
+ int index = t;
+ tasks[index] = Task.Run(async () =>
+ {
+ await gate.WaitAsync(testTimeout);
+ try
+ {
+ IEventSubscriberLease lease = session.AttachEventSubscriber(
+ allowMultipleSubscribers: true, maxSubscribers: Cap);
+ leases[index] = lease;
+ Interlocked.Increment(ref successCount);
+ }
+ catch (SessionManagerException exception)
+ when (exception.ErrorCode == SessionManagerErrorCode.EventSubscriberLimitReached)
+ {
+ Interlocked.Increment(ref limitReachedCount);
+ }
+
+ int observed = session.ActiveEventSubscriberCount;
+ int previousMax;
+ do
+ {
+ previousMax = Volatile.Read(ref maxObservedCount);
+ if (observed <= previousMax)
+ {
+ break;
+ }
+ }
+ while (Interlocked.CompareExchange(ref maxObservedCount, observed, previousMax) != previousMax);
+ });
+ }
+
+ gate.Release(Attempts);
+ await Task.WhenAll(tasks).WaitAsync(testTimeout);
+
+ Assert.Equal(Cap, successCount);
+ Assert.Equal(Attempts - Cap, limitReachedCount);
+ Assert.Equal(Cap, session.ActiveEventSubscriberCount);
+ Assert.True(maxObservedCount <= Cap, $"Observed count {maxObservedCount} exceeded cap {Cap}.");
+
+ foreach (IEventSubscriberLease? lease in leases)
+ {
+ lease?.Dispose();
+ }
+
+ await session.CloseAsync("test-done", CancellationToken.None);
+ await session.DisposeAsync();
+ }
+
+ ///
+ /// Task 8. Disposing a subscriber frees a cap slot so a fresh attach succeeds, and a
+ /// double-dispose does not double-free the slot (count integrity preserved).
+ ///
+ [Fact]
+ public async Task AttachEventSubscriber_MultiMode_DisposeFreesSlotAndDoubleDisposeIsIdempotent()
+ {
+ const int Cap = 2;
+ FakeWorkerClient workerClient = new();
+ GatewaySession session = CreateReadySessionWithEventStreaming(
+ workerClient,
+ allowMultipleEventSubscribers: true);
+
+ IEventSubscriberLease a = session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap);
+ IEventSubscriberLease b = session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap);
+ Assert.Equal(Cap, session.ActiveEventSubscriberCount);
+
+ // At cap: next attach is rejected.
+ Assert.Throws(
+ () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap));
+
+ // Dispose one — and dispose it twice. The second dispose must not double-free.
+ a.Dispose();
+ a.Dispose();
+ Assert.Equal(1, session.ActiveEventSubscriberCount);
+
+ // Exactly one slot is free, so exactly one fresh attach succeeds.
+ using IEventSubscriberLease c = session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap);
+ Assert.Equal(Cap, session.ActiveEventSubscriberCount);
+ Assert.Throws(
+ () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap));
+
+ b.Dispose();
+
+ await session.CloseAsync("test-done", CancellationToken.None);
+ await session.DisposeAsync();
+ }
+
private static GatewaySession CreateReadySession(IWorkerClient workerClient)
{
GatewaySession session = new(
@@ -241,7 +457,10 @@ public sealed class GatewaySessionTests
return session;
}
- private static GatewaySession CreateReadySessionWithEventStreaming(IWorkerClient workerClient)
+ private static GatewaySession CreateReadySessionWithEventStreaming(
+ IWorkerClient workerClient,
+ bool allowMultipleEventSubscribers = false,
+ IDashboardEventBroadcaster? dashboardBroadcaster = null)
{
GatewaySession session = new(
sessionId: "session-test-concurrent",
@@ -262,7 +481,9 @@ public sealed class GatewaySessionTests
new EventOptions { QueueCapacity = 8 },
NullLogger.Instance,
TimeProvider.System,
- new GatewayMetrics()));
+ new GatewayMetrics(),
+ dashboardBroadcaster,
+ allowMultipleEventSubscribers));
session.AttachWorkerClient(workerClient);
session.MarkReady();
return session;
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
index 58ca40b..5db1b5e 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
@@ -355,7 +355,8 @@ public sealed class SessionEventDistributorTests
Interlocked.Increment(ref overflowCalls);
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
Volatile.Write(ref observedIsOnlySubscriberSet, 1);
- });
+ },
+ singleSubscriberMode: false);
await distributor.StartAsync(CancellationToken.None);
// Slow subscriber: registered but never read, so its capacity-2 channel fills.
@@ -377,7 +378,7 @@ public sealed class SessionEventDistributorTests
async () => await DrainUntilFaultAsync(slow.Reader));
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
- // Two subscribers were registered at overflow time, so isOnlySubscriber is false.
+ // Multi-subscriber mode, so isOnlySubscriber is always false (Task 8 mode-gating).
// Use Interlocked.Read / Volatile.Read so the test-thread reads are ordered after the
// pump-thread writes, avoiding a data race by the C# memory model.
Assert.Equal(1, Volatile.Read(ref overflowCalls));
@@ -395,15 +396,12 @@ public sealed class SessionEventDistributorTests
public async Task SlowSubscriberOverflow_WithMultipleSubscribers_HandlerSeesIsOnlySubscriberFalse_OtherKeepsReceiving()
{
// Distributor-level pin for "FailFast with multiple subscribers degrades to
- // disconnect-only (no session fault)": when the overflowing subscriber is NOT the
- // sole subscriber, isOnlySubscriber is false, so a FailFast-wired handler must NOT
- // fault the session. This test drives the distributor directly (without GatewaySession)
- // with two subscribers and a FailFast-style overflow handler seam, overflows the slow
- // one, and asserts (a) isOnlySubscriber==false, (b) the other subscriber keeps
- // receiving, and (c) the pump keeps running — all without a GatewaySession.
- //
- // TODO(Task 8): add a GatewaySession-level "session stays Ready" assertion once
- // multi-subscriber config is enabled by the Tasks 7/8 validator/guard change.
+ // disconnect-only (no session fault)": in multi-subscriber mode isOnlySubscriber is
+ // always false (Task 8 mode-gating), so a FailFast-wired handler must NOT fault the
+ // session. This test drives the distributor directly (without GatewaySession) in
+ // multi-subscriber mode with two subscribers and a FailFast-style overflow handler
+ // seam, overflows the slow one, and asserts (a) isOnlySubscriber==false, (b) the other
+ // subscriber keeps receiving, and (c) the pump keeps running.
Channel source = Channel.CreateUnbounded();
bool handlerFiredWithFalse = false;
bool sessionFaultWouldBeCalled = false; // tracks if a FailFast path would fault
@@ -427,7 +425,8 @@ public sealed class SessionEventDistributorTests
// Single-subscriber: FailFast would fault the session — must not happen here.
Volatile.Write(ref sessionFaultWouldBeCalled, true);
}
- });
+ },
+ singleSubscriberMode: false);
await distributor.StartAsync(CancellationToken.None);
// Slow subscriber: never reads, so capacity-2 channel overflows quickly.
@@ -520,6 +519,56 @@ public sealed class SessionEventDistributorTests
"isInternal must be true for a subscriber registered with isInternal: true.");
}
+ [Fact]
+ public async Task SingleSubscriberMode_LoneExternalOverflow_HandlerSeesIsOnlySubscriberTrue()
+ {
+ // Task 8 mode-gating: in single-subscriber mode a lone external subscriber that
+ // overflows reports isOnlySubscriber==true, so the legacy FailFast session-fault path
+ // is preserved. The decision is gated on the fixed session mode, NOT a live count, so
+ // it is race-free.
+ Channel source = Channel.CreateUnbounded();
+ int observedSet = 0;
+ bool observedValue = false;
+ await using SessionEventDistributor distributor = new(
+ "session-single-sub",
+ ct => source.Reader.ReadAllAsync(ct),
+ subscriberQueueCapacity: 2,
+ replayBufferCapacity: 0,
+ replayRetentionSeconds: 0,
+ NullLogger.Instance,
+ TimeProvider.System,
+ (isOnlySubscriber, _) =>
+ {
+ Volatile.Write(ref observedValue, isOnlySubscriber);
+ Volatile.Write(ref observedSet, 1);
+ },
+ singleSubscriberMode: true);
+ await distributor.StartAsync(CancellationToken.None);
+
+ using IEventSubscriberLease external = distributor.Register();
+
+ for (ulong sequence = 1; sequence <= 10; sequence++)
+ {
+ source.Writer.TryWrite(Event(sequence));
+ }
+
+ SessionManagerException fault = await Assert.ThrowsAsync(
+ async () => await DrainUntilFaultAsync(external.Reader));
+ Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
+
+ await Task.Run(async () =>
+ {
+ using CancellationTokenSource cts = new(ReadTimeout);
+ while (Volatile.Read(ref observedSet) == 0)
+ {
+ await Task.Delay(10, cts.Token);
+ }
+ });
+
+ Assert.True(Volatile.Read(ref observedValue),
+ "isOnlySubscriber must be true for a lone external subscriber in single-subscriber mode.");
+ }
+
private static async Task DrainUntilFaultAsync(ChannelReader reader)
{
// Drains any buffered events, then surfaces the channel's completion fault (if any)
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs
index cd527d9..5cf43dc 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs
@@ -743,7 +743,7 @@ public sealed class SessionManagerTests
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
DateTimeOffset now = DateTimeOffset.UtcNow;
session.ExtendLease(now.AddSeconds(-1));
- using IDisposable eventSubscriber = session.AttachEventSubscriber(allowMultipleSubscribers: false);
+ using IDisposable eventSubscriber = session.AttachEventSubscriber(allowMultipleSubscribers: false, maxSubscribers: 1);
int closedCount = await manager.CloseExpiredLeasesAsync(now, CancellationToken.None);