feat(config): allow multiple event subscribers + add MaxEventSubscribersPerSession cap

Remove the hard-rejection of AllowMultipleEventSubscribers=true in GatewayOptionsValidator
(fan-out is now implemented via SessionEventDistributor). Add MaxEventSubscribersPerSession
(default 8, must be >= 1) to SessionOptions, validate it, expose it in
EffectiveSessionConfiguration / GatewayConfigurationProvider, document it in
GatewayConfiguration.md and appsettings.json. Tests cover the no-error path for
AllowMultipleEventSubscribers=true, the 0/-1 rejection, positive pass, and default pass.
This commit is contained in:
Joseph Doherty
2026-06-15 15:13:21 -04:00
parent 2ead9bc200
commit bd190ab012
7 changed files with 87 additions and 13 deletions
+3 -4
View File
@@ -125,11 +125,10 @@ to avoid accidental large allocations from malformed or oversized frames.
| `MxGateway:Sessions:MaxPendingCommandsPerSession` | `128` | Maximum number of pending worker commands for one session. Excess commands fail fast instead of queueing indefinitely. |
| `MxGateway:Sessions:DefaultLeaseSeconds` | `1800` | Initial session lease and refresh duration. Unary client activity extends the lease by this duration. |
| `MxGateway:Sessions:LeaseSweepIntervalSeconds` | `30` | Hosted monitor interval for closing expired leases. Active event-stream subscribers keep a session from expiring while the stream remains attached. |
| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. `true` is rejected until event fan-out is implemented. |
| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. When `false` the session refuses a second subscriber with `AlreadyExists`. Set to `true` to enable fan-out via the `SessionEventDistributor`. |
| `MxGateway:Sessions:MaxEventSubscribersPerSession` | `8` | Maximum number of concurrent `StreamEvents` subscribers per session when `AllowMultipleEventSubscribers` is `true`. Effectively 1 when `AllowMultipleEventSubscribers` is `false`. Must be greater than zero. |
All numeric session options must be greater than zero. The current event stream
implementation supports one active subscriber per session; this preserves event
ordering and avoids competing consumers.
All numeric session options must be greater than zero.
## Event Options
@@ -6,4 +6,5 @@ public sealed record EffectiveSessionConfiguration(
int MaxPendingCommandsPerSession,
int DefaultLeaseSeconds,
int LeaseSweepIntervalSeconds,
bool AllowMultipleEventSubscribers);
bool AllowMultipleEventSubscribers,
int MaxEventSubscribersPerSession);
@@ -46,7 +46,8 @@ public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> option
MaxPendingCommandsPerSession: value.Sessions.MaxPendingCommandsPerSession,
DefaultLeaseSeconds: value.Sessions.DefaultLeaseSeconds,
LeaseSweepIntervalSeconds: value.Sessions.LeaseSweepIntervalSeconds,
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers),
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers,
MaxEventSubscribersPerSession: value.Sessions.MaxEventSubscribersPerSession),
Events: new EffectiveEventConfiguration(
QueueCapacity: value.Events.QueueCapacity,
BackpressurePolicy: value.Events.BackpressurePolicy.ToString(),
@@ -177,12 +177,10 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase<GatewayOption
options.LeaseSweepIntervalSeconds,
"MxGateway:Sessions:LeaseSweepIntervalSeconds must be greater than zero.",
builder);
if (options.AllowMultipleEventSubscribers)
{
builder.Add(
"MxGateway:Sessions:AllowMultipleEventSubscribers is not supported until event fan-out is implemented.");
}
AddIfNotPositive(
options.MaxEventSubscribersPerSession,
"MxGateway:Sessions:MaxEventSubscribersPerSession must be greater than zero.",
builder);
}
private static void ValidateEvents(EventOptions options, ValidationBuilder builder)
@@ -27,4 +27,11 @@ public sealed class SessionOptions
/// Gets a value indicating whether multiple event subscribers are allowed per session.
/// </summary>
public bool AllowMultipleEventSubscribers { get; init; }
/// <summary>
/// Gets the maximum number of concurrent event subscribers per session.
/// Applies when <see cref="AllowMultipleEventSubscribers"/> is <see langword="true"/>;
/// effectively 1 when it is <see langword="false"/>. Must be greater than zero.
/// </summary>
public int MaxEventSubscribersPerSession { get; init; } = 8;
}
@@ -46,7 +46,8 @@
"MaxPendingCommandsPerSession": 128,
"DefaultLeaseSeconds": 1800,
"LeaseSweepIntervalSeconds": 30,
"AllowMultipleEventSubscribers": false
"AllowMultipleEventSubscribers": false,
"MaxEventSubscribersPerSession": 8
},
"Events": {
"QueueCapacity": 10000,
@@ -289,4 +289,71 @@ public sealed class GatewayOptionsValidatorTests
Assert.True(result.Failed);
Assert.Contains(result.Failures!, f => f.Contains(keyPart));
}
// -------------------------------------------------------------------------
// AllowMultipleEventSubscribers / MaxEventSubscribersPerSession validation
// -------------------------------------------------------------------------
private static GatewayOptions CloneWithSessions(GatewayOptions source, SessionOptions sessions)
=> new()
{
Authentication = source.Authentication,
Ldap = source.Ldap,
Worker = source.Worker,
Sessions = sessions,
Events = source.Events,
Dashboard = source.Dashboard,
Protocol = source.Protocol,
Alarms = source.Alarms,
Tls = source.Tls,
};
[Fact]
public void Validate_Succeeds_WhenAllowMultipleEventSubscribersIsTrue()
{
// AllowMultipleEventSubscribers=true must now validate cleanly (no longer rejected).
GatewayOptions options = CloneWithSessions(
ValidOptions(),
new SessionOptions { AllowMultipleEventSubscribers = true });
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Succeeded);
}
[Theory]
[InlineData(0)]
[InlineData(-1)]
public void Validate_Fails_WhenMaxEventSubscribersPerSessionBelowOne(int value)
{
GatewayOptions options = CloneWithSessions(
ValidOptions(),
new SessionOptions { MaxEventSubscribersPerSession = value });
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Failed);
Assert.Contains(
result.Failures!,
f => f.Contains("MxGateway:Sessions:MaxEventSubscribersPerSession"));
}
[Theory]
[InlineData(1)]
[InlineData(8)]
[InlineData(32)]
public void Validate_Succeeds_WhenMaxEventSubscribersPerSessionIsPositive(int value)
{
GatewayOptions options = CloneWithSessions(
ValidOptions(),
new SessionOptions { MaxEventSubscribersPerSession = value });
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Succeeded);
}
[Fact]
public void Validate_Succeeds_WithDefaultSessionOptions()
{
// Default SessionOptions (AllowMultipleEventSubscribers=false, MaxEventSubscribersPerSession=8)
// must validate cleanly.
GatewayOptions options = CloneWithSessions(ValidOptions(), new SessionOptions());
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Succeeded);
}
}