From 281e00b3002d04524035d40bb472fe5e383483b0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 15:53:27 -0400 Subject: [PATCH] refactor(sessions): derive subscriber mode from session config; close Task 8 review nits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the per-call allowMultipleSubscribers param from AttachEventSubscriber and derive the mode internally from _eventStreaming.AllowMultipleEventSubscribers — the same source SessionEventDistributor uses for singleSubscriberMode — so the two can never structurally diverge. The maxSubscribers cap param is kept because MaxEventSubscribersPerSession lives in SessionOptions, which the session does not hold directly (only EventOptions flows through SessionEventStreaming). Other nits: - SubscriberCount XML doc clarifies it includes internal subscribers and differs from GatewaySession.ActiveEventSubscriberCount (external/gRPC only). - SingleSubscriberMode_LoneExternalOverflow test: add Assert.Equal(1, observedSet) guard before the value assertion so the test cannot pass vacuously if the handler never fired. - GatewayOptionsValidator.ValidateSessions: add explanatory code comment documenting why !AllowMultipleEventSubscribers && MaxEventSubscribersPerSession > 1 is NOT rejected as a hard error (the default config ships with this combination; the cap is simply unused in single-subscriber mode, not a behavior bug). - GatewaySession.DetachEventSubscriber: add Debug.Assert before the clamp so a genuine double-decrement surfaces in debug builds. --- .../Configuration/GatewayOptionsValidator.cs | 9 ++++ .../Grpc/EventStreamService.cs | 4 +- .../Sessions/GatewaySession.cs | 43 +++++++++++-------- .../Sessions/SessionEventDistributor.cs | 6 ++- .../GatewaySessionDashboardMirrorTests.cs | 2 +- .../Gateway/Sessions/GatewaySessionTests.cs | 36 +++++++--------- .../Sessions/SessionEventDistributorTests.cs | 3 ++ .../Gateway/Sessions/SessionManagerTests.cs | 2 +- 8 files changed, 62 insertions(+), 43 deletions(-) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs index dbaf7d6..53902cd 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs @@ -181,6 +181,15 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase 1 as a hard validation error here. The default + // SessionOptions ships with AllowMultipleEventSubscribers=false and + // MaxEventSubscribersPerSession=8; making those defaults a validation failure would + // break every deployment that has not explicitly set the cap. The cap is simply + // ignored in single-subscriber mode (AttachEventSubscriber derives effectiveCap=1), + // so the only practical consequence of the apparent inconsistency is a dead config + // knob, not incorrect behavior. } private static void ValidateEvents(EventOptions options, ValidationBuilder builder) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs index a3a8e06..6653aff 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs @@ -68,8 +68,10 @@ public sealed class EventStreamService( // No `using` here — subscriber.Dispose() is called exactly once in the finally // block below, which also disposes the reader. A `using` declaration would add a // second Dispose on the same path and double-decrement the session subscriber count. + // The subscriber mode (single vs. multi) is derived inside AttachEventSubscriber from + // the session's own SessionEventStreaming.AllowMultipleEventSubscribers field — the + // same source the distributor uses — so the two cannot diverge. IEventSubscriberLease subscriber = session.AttachEventSubscriber( - options.Value.Sessions.AllowMultipleEventSubscribers, options.Value.Sessions.MaxEventSubscribersPerSession); int streamQueueDepth = 0; diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs index d00a4b4..56bf7d1 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; using ZB.MOM.WW.MxGateway.Contracts.Proto; @@ -684,31 +685,31 @@ public sealed class GatewaySession /// s for this subscriber. The returned lease, when disposed, /// unregisters the distributor subscriber AND decrements the active-subscriber count. /// - /// - /// When , single-subscriber mode: a second concurrent EXTERNAL - /// subscriber is rejected with . - /// When , multi-subscriber mode: up to - /// concurrent EXTERNAL subscribers are allowed; the - /// next attach is rejected with - /// . - /// /// /// Maximum concurrent external subscribers in multi-subscriber mode - /// (MxGateway:Sessions:MaxEventSubscribersPerSession). Ignored when - /// is (the effective - /// cap is then 1). The gateway-owned internal dashboard subscriber is registered - /// directly on the distributor and is NOT counted here, so it never consumes cap budget. + /// (MxGateway:Sessions:MaxEventSubscribersPerSession). Ignored when the + /// session is in single-subscriber mode (AllowMultipleEventSubscribers == false); + /// the effective cap is then 1. The gateway-owned internal dashboard subscriber is + /// registered directly on the distributor and is NOT counted here, so it never + /// consumes cap budget. /// /// - /// The count-check-and-increment runs atomically under _syncRoot, so two - /// concurrent attaches racing toward the cap can never both succeed past it. On - /// distributor-register failure the count is rolled back (see the catch below). + /// The subscriber mode is derived internally from + /// — the same source + /// the uses to gate its FailFast decision — so + /// the cap-enforcement mode and the distributor's singleSubscriberMode field + /// cannot diverge. The count-check-and-increment runs atomically under + /// _syncRoot, so two concurrent attaches racing toward the cap can never both + /// succeed past it. On distributor-register failure the count is rolled back (see the + /// catch below). /// - public IEventSubscriberLease AttachEventSubscriber(bool allowMultipleSubscribers, int maxSubscribers) + public IEventSubscriberLease AttachEventSubscriber(int maxSubscribers) { - // Effective cap: 1 in single-subscriber mode, otherwise the configured maximum - // (clamped to at least 1 so a misconfigured non-positive value can never deadlock - // attaches in multi-subscriber mode). + // Derive the mode from the same source the distributor uses so the two can never + // diverge. Effective cap: 1 in single-subscriber mode, otherwise the configured + // maximum (clamped to at least 1 so a misconfigured non-positive value can never + // deadlock attaches in multi-subscriber mode). + bool allowMultipleSubscribers = _eventStreaming.AllowMultipleEventSubscribers; int effectiveCap = allowMultipleSubscribers ? Math.Max(1, maxSubscribers) : 1; lock (_syncRoot) @@ -1493,6 +1494,10 @@ public sealed class GatewaySession { lock (_syncRoot) { + // Assert in debug so a genuine double-decrement (a logic error) surfaces + // loudly; the clamp below keeps release builds safe if it somehow fires. + Debug.Assert(_activeEventSubscriberCount > 0, + "DetachEventSubscriber called with _activeEventSubscriberCount already at 0 — possible double-dispose."); if (_activeEventSubscriberCount > 0) { _activeEventSubscriberCount--; diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs index 71fa5bf..89cc1ce 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs @@ -236,7 +236,11 @@ public sealed class SessionEventDistributor : IAsyncDisposable } /// - /// Gets the count of currently-registered subscribers. + /// Gets the count of currently-registered subscribers. This count INCLUDES internal + /// subscribers (e.g. the gateway-owned dashboard mirror registered via + /// Register(isInternal: true)), and therefore differs from + /// , which tracks only external + /// (gRPC) subscribers and excludes the internal dashboard subscriber. /// public int SubscriberCount => _subscribers.Count; diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs index 0c07034..2f2b186 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs @@ -151,7 +151,7 @@ public sealed class GatewaySessionDashboardMirrorTests session.MarkReady(); Assert.Equal(0, session.ActiveEventSubscriberCount); - using IEventSubscriberLease lease = session.AttachEventSubscriber(allowMultipleSubscribers: false, maxSubscribers: 1); + using IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: 1); Assert.Equal(1, session.ActiveEventSubscriberCount); } diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs index d4bc8a0..33756fc 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs @@ -168,8 +168,8 @@ public sealed class GatewaySessionTests /// completion and a client cancellation both fire at the same time — must /// decrement _activeEventSubscriberCount exactly once, never to −1. /// A negative count permanently blocks future subscribers because - /// AttachEventSubscriber(allowMultipleSubscribers:false) gates on - /// _activeEventSubscriberCount > 0. After both racing disposes finish, + /// AttachEventSubscriber gates on _activeEventSubscriberCount >= effectiveCap. + /// After both racing disposes finish, /// the count must be exactly 0 and a subsequent single-subscriber attach must /// succeed. /// @@ -186,8 +186,7 @@ public sealed class GatewaySessionTests for (int i = 0; i < Iterations; i++) { // Attach one subscriber; this increments _activeEventSubscriberCount to 1. - IEventSubscriberLease lease = session.AttachEventSubscriber( - allowMultipleSubscribers: false, maxSubscribers: 1); + IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: 1); // Race Concurrency threads all calling Dispose() on the same lease. // Only one must actually run DetachEventSubscriber. @@ -212,8 +211,7 @@ public sealed class GatewaySessionTests // Observable contract: a fresh single subscriber must now be attachable // (i.e., the guard _activeEventSubscriberCount > 0 is false). - IEventSubscriberLease next = session.AttachEventSubscriber( - allowMultipleSubscribers: false, maxSubscribers: 1); + IEventSubscriberLease next = session.AttachEventSubscriber(maxSubscribers: 1); next.Dispose(); Assert.Equal(0, session.ActiveEventSubscriberCount); } @@ -233,11 +231,10 @@ public sealed class GatewaySessionTests FakeWorkerClient workerClient = new(); GatewaySession session = CreateReadySessionWithEventStreaming(workerClient); - using IEventSubscriberLease first = session.AttachEventSubscriber( - allowMultipleSubscribers: false, maxSubscribers: 8); + using IEventSubscriberLease first = session.AttachEventSubscriber(maxSubscribers: 8); SessionManagerException exception = Assert.Throws( - () => session.AttachEventSubscriber(allowMultipleSubscribers: false, maxSubscribers: 8)); + () => session.AttachEventSubscriber(maxSubscribers: 8)); Assert.Equal(SessionManagerErrorCode.EventSubscriberAlreadyActive, exception.ErrorCode); Assert.Equal(1, session.ActiveEventSubscriberCount); @@ -262,13 +259,13 @@ public sealed class GatewaySessionTests List leases = []; for (int i = 0; i < Cap; i++) { - leases.Add(session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + leases.Add(session.AttachEventSubscriber(maxSubscribers: Cap)); } Assert.Equal(Cap, session.ActiveEventSubscriberCount); SessionManagerException exception = Assert.Throws( - () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + () => session.AttachEventSubscriber(maxSubscribers: Cap)); Assert.Equal(SessionManagerErrorCode.EventSubscriberLimitReached, exception.ErrorCode); Assert.Equal(Cap, session.ActiveEventSubscriberCount); @@ -304,14 +301,14 @@ public sealed class GatewaySessionTests List leases = []; for (int i = 0; i < Cap; i++) { - leases.Add(session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + leases.Add(session.AttachEventSubscriber(maxSubscribers: Cap)); } Assert.Equal(Cap, session.ActiveEventSubscriberCount); // The (cap+1)-th still fails: the dashboard mirror did not eat a slot. SessionManagerException exception = Assert.Throws( - () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + () => session.AttachEventSubscriber(maxSubscribers: Cap)); Assert.Equal(SessionManagerErrorCode.EventSubscriberLimitReached, exception.ErrorCode); foreach (IEventSubscriberLease lease in leases) @@ -356,8 +353,7 @@ public sealed class GatewaySessionTests await gate.WaitAsync(testTimeout); try { - IEventSubscriberLease lease = session.AttachEventSubscriber( - allowMultipleSubscribers: true, maxSubscribers: Cap); + IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: Cap); leases[index] = lease; Interlocked.Increment(ref successCount); } @@ -411,13 +407,13 @@ public sealed class GatewaySessionTests workerClient, allowMultipleEventSubscribers: true); - IEventSubscriberLease a = session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap); - IEventSubscriberLease b = session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap); + IEventSubscriberLease a = session.AttachEventSubscriber(maxSubscribers: Cap); + IEventSubscriberLease b = session.AttachEventSubscriber(maxSubscribers: Cap); Assert.Equal(Cap, session.ActiveEventSubscriberCount); // At cap: next attach is rejected. Assert.Throws( - () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + () => session.AttachEventSubscriber(maxSubscribers: Cap)); // Dispose one — and dispose it twice. The second dispose must not double-free. a.Dispose(); @@ -425,10 +421,10 @@ public sealed class GatewaySessionTests Assert.Equal(1, session.ActiveEventSubscriberCount); // Exactly one slot is free, so exactly one fresh attach succeeds. - using IEventSubscriberLease c = session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap); + using IEventSubscriberLease c = session.AttachEventSubscriber(maxSubscribers: Cap); Assert.Equal(Cap, session.ActiveEventSubscriberCount); Assert.Throws( - () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + () => session.AttachEventSubscriber(maxSubscribers: Cap)); b.Dispose(); diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs index 5db1b5e..0d5bc8d 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs @@ -565,6 +565,9 @@ public sealed class SessionEventDistributorTests } }); + // Guard: ensure the handler actually fired before asserting its observed value. + // Without this the test could pass vacuously if the overflow never triggered. + Assert.Equal(1, Volatile.Read(ref observedSet)); Assert.True(Volatile.Read(ref observedValue), "isOnlySubscriber must be true for a lone external subscriber in single-subscriber mode."); } diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs index 5cf43dc..61dc32e 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs @@ -743,7 +743,7 @@ public sealed class SessionManagerTests GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None); DateTimeOffset now = DateTimeOffset.UtcNow; session.ExtendLease(now.AddSeconds(-1)); - using IDisposable eventSubscriber = session.AttachEventSubscriber(allowMultipleSubscribers: false, maxSubscribers: 1); + using IDisposable eventSubscriber = session.AttachEventSubscriber(maxSubscribers: 1); int closedCount = await manager.CloseExpiredLeasesAsync(now, CancellationToken.None);