From bd190ab012875a78c159de9e7c8a59249d4527a3 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 15:13:21 -0400 Subject: [PATCH] feat(config): allow multiple event subscribers + add MaxEventSubscribersPerSession cap Remove the hard-rejection of AllowMultipleEventSubscribers=true in GatewayOptionsValidator (fan-out is now implemented via SessionEventDistributor). Add MaxEventSubscribersPerSession (default 8, must be >= 1) to SessionOptions, validate it, expose it in EffectiveSessionConfiguration / GatewayConfigurationProvider, document it in GatewayConfiguration.md and appsettings.json. Tests cover the no-error path for AllowMultipleEventSubscribers=true, the 0/-1 rejection, positive pass, and default pass. --- docs/GatewayConfiguration.md | 7 +- .../EffectiveSessionConfiguration.cs | 3 +- .../GatewayConfigurationProvider.cs | 3 +- .../Configuration/GatewayOptionsValidator.cs | 10 ++- .../Configuration/SessionOptions.cs | 7 ++ .../appsettings.json | 3 +- .../GatewayOptionsValidatorTests.cs | 67 +++++++++++++++++++ 7 files changed, 87 insertions(+), 13 deletions(-) diff --git a/docs/GatewayConfiguration.md b/docs/GatewayConfiguration.md index db57112..338c7c1 100644 --- a/docs/GatewayConfiguration.md +++ b/docs/GatewayConfiguration.md @@ -125,11 +125,10 @@ to avoid accidental large allocations from malformed or oversized frames. | `MxGateway:Sessions:MaxPendingCommandsPerSession` | `128` | Maximum number of pending worker commands for one session. Excess commands fail fast instead of queueing indefinitely. | | `MxGateway:Sessions:DefaultLeaseSeconds` | `1800` | Initial session lease and refresh duration. Unary client activity extends the lease by this duration. | | `MxGateway:Sessions:LeaseSweepIntervalSeconds` | `30` | Hosted monitor interval for closing expired leases. Active event-stream subscribers keep a session from expiring while the stream remains attached. | -| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. `true` is rejected until event fan-out is implemented. | +| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. When `false` the session refuses a second subscriber with `AlreadyExists`. Set to `true` to enable fan-out via the `SessionEventDistributor`. | +| `MxGateway:Sessions:MaxEventSubscribersPerSession` | `8` | Maximum number of concurrent `StreamEvents` subscribers per session when `AllowMultipleEventSubscribers` is `true`. Effectively 1 when `AllowMultipleEventSubscribers` is `false`. Must be greater than zero. | -All numeric session options must be greater than zero. The current event stream -implementation supports one active subscriber per session; this preserves event -ordering and avoids competing consumers. +All numeric session options must be greater than zero. ## Event Options diff --git a/src/ZB.MOM.WW.MxGateway.Server/Configuration/EffectiveSessionConfiguration.cs b/src/ZB.MOM.WW.MxGateway.Server/Configuration/EffectiveSessionConfiguration.cs index 22ef89b..0558b5b 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Configuration/EffectiveSessionConfiguration.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Configuration/EffectiveSessionConfiguration.cs @@ -6,4 +6,5 @@ public sealed record EffectiveSessionConfiguration( int MaxPendingCommandsPerSession, int DefaultLeaseSeconds, int LeaseSweepIntervalSeconds, - bool AllowMultipleEventSubscribers); + bool AllowMultipleEventSubscribers, + int MaxEventSubscribersPerSession); diff --git a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayConfigurationProvider.cs b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayConfigurationProvider.cs index dccb834..a09956b 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayConfigurationProvider.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayConfigurationProvider.cs @@ -46,7 +46,8 @@ public sealed class GatewayConfigurationProvider(IOptions option MaxPendingCommandsPerSession: value.Sessions.MaxPendingCommandsPerSession, DefaultLeaseSeconds: value.Sessions.DefaultLeaseSeconds, LeaseSweepIntervalSeconds: value.Sessions.LeaseSweepIntervalSeconds, - AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers), + AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers, + MaxEventSubscribersPerSession: value.Sessions.MaxEventSubscribersPerSession), Events: new EffectiveEventConfiguration( QueueCapacity: value.Events.QueueCapacity, BackpressurePolicy: value.Events.BackpressurePolicy.ToString(), diff --git a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs index f93ec03..dbaf7d6 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs @@ -177,12 +177,10 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase public bool AllowMultipleEventSubscribers { get; init; } + + /// + /// Gets the maximum number of concurrent event subscribers per session. + /// Applies when is ; + /// effectively 1 when it is . Must be greater than zero. + /// + public int MaxEventSubscribersPerSession { get; init; } = 8; } diff --git a/src/ZB.MOM.WW.MxGateway.Server/appsettings.json b/src/ZB.MOM.WW.MxGateway.Server/appsettings.json index d64ab14..db21287 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/appsettings.json +++ b/src/ZB.MOM.WW.MxGateway.Server/appsettings.json @@ -46,7 +46,8 @@ "MaxPendingCommandsPerSession": 128, "DefaultLeaseSeconds": 1800, "LeaseSweepIntervalSeconds": 30, - "AllowMultipleEventSubscribers": false + "AllowMultipleEventSubscribers": false, + "MaxEventSubscribersPerSession": 8 }, "Events": { "QueueCapacity": 10000, diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Configuration/GatewayOptionsValidatorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Configuration/GatewayOptionsValidatorTests.cs index fef9630..11a14fc 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Configuration/GatewayOptionsValidatorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Configuration/GatewayOptionsValidatorTests.cs @@ -289,4 +289,71 @@ public sealed class GatewayOptionsValidatorTests Assert.True(result.Failed); Assert.Contains(result.Failures!, f => f.Contains(keyPart)); } + + // ------------------------------------------------------------------------- + // AllowMultipleEventSubscribers / MaxEventSubscribersPerSession validation + // ------------------------------------------------------------------------- + + private static GatewayOptions CloneWithSessions(GatewayOptions source, SessionOptions sessions) + => new() + { + Authentication = source.Authentication, + Ldap = source.Ldap, + Worker = source.Worker, + Sessions = sessions, + Events = source.Events, + Dashboard = source.Dashboard, + Protocol = source.Protocol, + Alarms = source.Alarms, + Tls = source.Tls, + }; + + [Fact] + public void Validate_Succeeds_WhenAllowMultipleEventSubscribersIsTrue() + { + // AllowMultipleEventSubscribers=true must now validate cleanly (no longer rejected). + GatewayOptions options = CloneWithSessions( + ValidOptions(), + new SessionOptions { AllowMultipleEventSubscribers = true }); + ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options); + Assert.True(result.Succeeded); + } + + [Theory] + [InlineData(0)] + [InlineData(-1)] + public void Validate_Fails_WhenMaxEventSubscribersPerSessionBelowOne(int value) + { + GatewayOptions options = CloneWithSessions( + ValidOptions(), + new SessionOptions { MaxEventSubscribersPerSession = value }); + ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options); + Assert.True(result.Failed); + Assert.Contains( + result.Failures!, + f => f.Contains("MxGateway:Sessions:MaxEventSubscribersPerSession")); + } + + [Theory] + [InlineData(1)] + [InlineData(8)] + [InlineData(32)] + public void Validate_Succeeds_WhenMaxEventSubscribersPerSessionIsPositive(int value) + { + GatewayOptions options = CloneWithSessions( + ValidOptions(), + new SessionOptions { MaxEventSubscribersPerSession = value }); + ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options); + Assert.True(result.Succeeded); + } + + [Fact] + public void Validate_Succeeds_WithDefaultSessionOptions() + { + // Default SessionOptions (AllowMultipleEventSubscribers=false, MaxEventSubscribersPerSession=8) + // must validate cleanly. + GatewayOptions options = CloneWithSessions(ValidOptions(), new SessionOptions()); + ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options); + Assert.True(result.Succeeded); + } }