diff --git a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs index dbaf7d6..53902cd 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs @@ -181,6 +181,15 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase 1 as a hard validation error here. The default + // SessionOptions ships with AllowMultipleEventSubscribers=false and + // MaxEventSubscribersPerSession=8; making those defaults a validation failure would + // break every deployment that has not explicitly set the cap. The cap is simply + // ignored in single-subscriber mode (AttachEventSubscriber derives effectiveCap=1), + // so the only practical consequence of the apparent inconsistency is a dead config + // knob, not incorrect behavior. } private static void ValidateEvents(EventOptions options, ValidationBuilder builder) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs index a3a8e06..6653aff 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs @@ -68,8 +68,10 @@ public sealed class EventStreamService( // No `using` here — subscriber.Dispose() is called exactly once in the finally // block below, which also disposes the reader. A `using` declaration would add a // second Dispose on the same path and double-decrement the session subscriber count. + // The subscriber mode (single vs. multi) is derived inside AttachEventSubscriber from + // the session's own SessionEventStreaming.AllowMultipleEventSubscribers field — the + // same source the distributor uses — so the two cannot diverge. IEventSubscriberLease subscriber = session.AttachEventSubscriber( - options.Value.Sessions.AllowMultipleEventSubscribers, options.Value.Sessions.MaxEventSubscribersPerSession); int streamQueueDepth = 0; diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs index d00a4b4..56bf7d1 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; using ZB.MOM.WW.MxGateway.Contracts.Proto; @@ -684,31 +685,31 @@ public sealed class GatewaySession /// s for this subscriber. The returned lease, when disposed, /// unregisters the distributor subscriber AND decrements the active-subscriber count. /// - /// - /// When , single-subscriber mode: a second concurrent EXTERNAL - /// subscriber is rejected with . - /// When , multi-subscriber mode: up to - /// concurrent EXTERNAL subscribers are allowed; the - /// next attach is rejected with - /// . - /// /// /// Maximum concurrent external subscribers in multi-subscriber mode - /// (MxGateway:Sessions:MaxEventSubscribersPerSession). Ignored when - /// is (the effective - /// cap is then 1). The gateway-owned internal dashboard subscriber is registered - /// directly on the distributor and is NOT counted here, so it never consumes cap budget. + /// (MxGateway:Sessions:MaxEventSubscribersPerSession). Ignored when the + /// session is in single-subscriber mode (AllowMultipleEventSubscribers == false); + /// the effective cap is then 1. The gateway-owned internal dashboard subscriber is + /// registered directly on the distributor and is NOT counted here, so it never + /// consumes cap budget. /// /// - /// The count-check-and-increment runs atomically under _syncRoot, so two - /// concurrent attaches racing toward the cap can never both succeed past it. On - /// distributor-register failure the count is rolled back (see the catch below). + /// The subscriber mode is derived internally from + /// — the same source + /// the uses to gate its FailFast decision — so + /// the cap-enforcement mode and the distributor's singleSubscriberMode field + /// cannot diverge. The count-check-and-increment runs atomically under + /// _syncRoot, so two concurrent attaches racing toward the cap can never both + /// succeed past it. On distributor-register failure the count is rolled back (see the + /// catch below). /// - public IEventSubscriberLease AttachEventSubscriber(bool allowMultipleSubscribers, int maxSubscribers) + public IEventSubscriberLease AttachEventSubscriber(int maxSubscribers) { - // Effective cap: 1 in single-subscriber mode, otherwise the configured maximum - // (clamped to at least 1 so a misconfigured non-positive value can never deadlock - // attaches in multi-subscriber mode). + // Derive the mode from the same source the distributor uses so the two can never + // diverge. Effective cap: 1 in single-subscriber mode, otherwise the configured + // maximum (clamped to at least 1 so a misconfigured non-positive value can never + // deadlock attaches in multi-subscriber mode). + bool allowMultipleSubscribers = _eventStreaming.AllowMultipleEventSubscribers; int effectiveCap = allowMultipleSubscribers ? Math.Max(1, maxSubscribers) : 1; lock (_syncRoot) @@ -1493,6 +1494,10 @@ public sealed class GatewaySession { lock (_syncRoot) { + // Assert in debug so a genuine double-decrement (a logic error) surfaces + // loudly; the clamp below keeps release builds safe if it somehow fires. + Debug.Assert(_activeEventSubscriberCount > 0, + "DetachEventSubscriber called with _activeEventSubscriberCount already at 0 — possible double-dispose."); if (_activeEventSubscriberCount > 0) { _activeEventSubscriberCount--; diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs index 71fa5bf..89cc1ce 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs @@ -236,7 +236,11 @@ public sealed class SessionEventDistributor : IAsyncDisposable } /// - /// Gets the count of currently-registered subscribers. + /// Gets the count of currently-registered subscribers. This count INCLUDES internal + /// subscribers (e.g. the gateway-owned dashboard mirror registered via + /// Register(isInternal: true)), and therefore differs from + /// , which tracks only external + /// (gRPC) subscribers and excludes the internal dashboard subscriber. /// public int SubscriberCount => _subscribers.Count; diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs index 0c07034..2f2b186 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs @@ -151,7 +151,7 @@ public sealed class GatewaySessionDashboardMirrorTests session.MarkReady(); Assert.Equal(0, session.ActiveEventSubscriberCount); - using IEventSubscriberLease lease = session.AttachEventSubscriber(allowMultipleSubscribers: false, maxSubscribers: 1); + using IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: 1); Assert.Equal(1, session.ActiveEventSubscriberCount); } diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs index d4bc8a0..33756fc 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs @@ -168,8 +168,8 @@ public sealed class GatewaySessionTests /// completion and a client cancellation both fire at the same time — must /// decrement _activeEventSubscriberCount exactly once, never to −1. /// A negative count permanently blocks future subscribers because - /// AttachEventSubscriber(allowMultipleSubscribers:false) gates on - /// _activeEventSubscriberCount > 0. After both racing disposes finish, + /// AttachEventSubscriber gates on _activeEventSubscriberCount >= effectiveCap. + /// After both racing disposes finish, /// the count must be exactly 0 and a subsequent single-subscriber attach must /// succeed. /// @@ -186,8 +186,7 @@ public sealed class GatewaySessionTests for (int i = 0; i < Iterations; i++) { // Attach one subscriber; this increments _activeEventSubscriberCount to 1. - IEventSubscriberLease lease = session.AttachEventSubscriber( - allowMultipleSubscribers: false, maxSubscribers: 1); + IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: 1); // Race Concurrency threads all calling Dispose() on the same lease. // Only one must actually run DetachEventSubscriber. @@ -212,8 +211,7 @@ public sealed class GatewaySessionTests // Observable contract: a fresh single subscriber must now be attachable // (i.e., the guard _activeEventSubscriberCount > 0 is false). - IEventSubscriberLease next = session.AttachEventSubscriber( - allowMultipleSubscribers: false, maxSubscribers: 1); + IEventSubscriberLease next = session.AttachEventSubscriber(maxSubscribers: 1); next.Dispose(); Assert.Equal(0, session.ActiveEventSubscriberCount); } @@ -233,11 +231,10 @@ public sealed class GatewaySessionTests FakeWorkerClient workerClient = new(); GatewaySession session = CreateReadySessionWithEventStreaming(workerClient); - using IEventSubscriberLease first = session.AttachEventSubscriber( - allowMultipleSubscribers: false, maxSubscribers: 8); + using IEventSubscriberLease first = session.AttachEventSubscriber(maxSubscribers: 8); SessionManagerException exception = Assert.Throws( - () => session.AttachEventSubscriber(allowMultipleSubscribers: false, maxSubscribers: 8)); + () => session.AttachEventSubscriber(maxSubscribers: 8)); Assert.Equal(SessionManagerErrorCode.EventSubscriberAlreadyActive, exception.ErrorCode); Assert.Equal(1, session.ActiveEventSubscriberCount); @@ -262,13 +259,13 @@ public sealed class GatewaySessionTests List leases = []; for (int i = 0; i < Cap; i++) { - leases.Add(session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + leases.Add(session.AttachEventSubscriber(maxSubscribers: Cap)); } Assert.Equal(Cap, session.ActiveEventSubscriberCount); SessionManagerException exception = Assert.Throws( - () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + () => session.AttachEventSubscriber(maxSubscribers: Cap)); Assert.Equal(SessionManagerErrorCode.EventSubscriberLimitReached, exception.ErrorCode); Assert.Equal(Cap, session.ActiveEventSubscriberCount); @@ -304,14 +301,14 @@ public sealed class GatewaySessionTests List leases = []; for (int i = 0; i < Cap; i++) { - leases.Add(session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + leases.Add(session.AttachEventSubscriber(maxSubscribers: Cap)); } Assert.Equal(Cap, session.ActiveEventSubscriberCount); // The (cap+1)-th still fails: the dashboard mirror did not eat a slot. SessionManagerException exception = Assert.Throws( - () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + () => session.AttachEventSubscriber(maxSubscribers: Cap)); Assert.Equal(SessionManagerErrorCode.EventSubscriberLimitReached, exception.ErrorCode); foreach (IEventSubscriberLease lease in leases) @@ -356,8 +353,7 @@ public sealed class GatewaySessionTests await gate.WaitAsync(testTimeout); try { - IEventSubscriberLease lease = session.AttachEventSubscriber( - allowMultipleSubscribers: true, maxSubscribers: Cap); + IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: Cap); leases[index] = lease; Interlocked.Increment(ref successCount); } @@ -411,13 +407,13 @@ public sealed class GatewaySessionTests workerClient, allowMultipleEventSubscribers: true); - IEventSubscriberLease a = session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap); - IEventSubscriberLease b = session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap); + IEventSubscriberLease a = session.AttachEventSubscriber(maxSubscribers: Cap); + IEventSubscriberLease b = session.AttachEventSubscriber(maxSubscribers: Cap); Assert.Equal(Cap, session.ActiveEventSubscriberCount); // At cap: next attach is rejected. Assert.Throws( - () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + () => session.AttachEventSubscriber(maxSubscribers: Cap)); // Dispose one — and dispose it twice. The second dispose must not double-free. a.Dispose(); @@ -425,10 +421,10 @@ public sealed class GatewaySessionTests Assert.Equal(1, session.ActiveEventSubscriberCount); // Exactly one slot is free, so exactly one fresh attach succeeds. - using IEventSubscriberLease c = session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap); + using IEventSubscriberLease c = session.AttachEventSubscriber(maxSubscribers: Cap); Assert.Equal(Cap, session.ActiveEventSubscriberCount); Assert.Throws( - () => session.AttachEventSubscriber(allowMultipleSubscribers: true, maxSubscribers: Cap)); + () => session.AttachEventSubscriber(maxSubscribers: Cap)); b.Dispose(); diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs index 5db1b5e..0d5bc8d 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs @@ -565,6 +565,9 @@ public sealed class SessionEventDistributorTests } }); + // Guard: ensure the handler actually fired before asserting its observed value. + // Without this the test could pass vacuously if the overflow never triggered. + Assert.Equal(1, Volatile.Read(ref observedSet)); Assert.True(Volatile.Read(ref observedValue), "isOnlySubscriber must be true for a lone external subscriber in single-subscriber mode."); } diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs index 5cf43dc..61dc32e 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs @@ -743,7 +743,7 @@ public sealed class SessionManagerTests GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None); DateTimeOffset now = DateTimeOffset.UtcNow; session.ExtendLease(now.AddSeconds(-1)); - using IDisposable eventSubscriber = session.AttachEventSubscriber(allowMultipleSubscribers: false, maxSubscribers: 1); + using IDisposable eventSubscriber = session.AttachEventSubscriber(maxSubscribers: 1); int closedCount = await manager.CloseExpiredLeasesAsync(now, CancellationToken.None);