feat(server): add MxGateway:Sessions:WorkerReadyWaitTimeoutMs (default off)
Adds WorkerReadyWaitTimeoutMs to SessionOptions (default 0 = disabled), validates >= 0 in GatewayOptionsValidator, documents it in GatewayConfiguration.md, and adds validator + default-value tests. No wait/poll logic is implemented here (that is Task 8).
This commit is contained in:
@@ -39,7 +39,8 @@ paths, timeouts, queue sizes, enum values, or protocol values are invalid.
|
||||
"LeaseSweepIntervalSeconds": 30,
|
||||
"DetachGraceSeconds": 30,
|
||||
"AllowMultipleEventSubscribers": false,
|
||||
"MaxEventSubscribersPerSession": 8
|
||||
"MaxEventSubscribersPerSession": 8,
|
||||
"WorkerReadyWaitTimeoutMs": 0
|
||||
},
|
||||
"Events": {
|
||||
"QueueCapacity": 10000,
|
||||
@@ -130,6 +131,7 @@ to avoid accidental large allocations from malformed or oversized frames.
|
||||
| `MxGateway:Sessions:DetachGraceSeconds` | `30` | Detach-grace retention window. When positive, a session whose last external (gRPC) event-stream subscriber drops is retained in `Ready` for this many seconds so a client can reconnect; if no external subscriber re-attaches within the window, the lease monitor closes it with `detach-grace-expired`. The internal dashboard mirror does not count as an external subscriber, so a dashboard-only session still enters detach-grace. `0` disables retention and reverts to closing only on normal lease expiry. Must be zero or greater. Reconnect/replay itself is implemented separately (Task 12); this option controls retention and expiry only. The effective close happens within the next sweep cycle after the window elapses — up to `LeaseSweepIntervalSeconds` after expiry. Operators wanting a firm minimum retention bound should set `DetachGraceSeconds` greater than `LeaseSweepIntervalSeconds`. |
|
||||
| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. When `false` the session refuses a second subscriber with `AlreadyExists`. Set to `true` to enable fan-out via the `SessionEventDistributor`. |
|
||||
| `MxGateway:Sessions:MaxEventSubscribersPerSession` | `8` | Maximum number of concurrent `StreamEvents` subscribers per session when `AllowMultipleEventSubscribers` is `true`. Effectively 1 when `AllowMultipleEventSubscribers` is `false`. Must be greater than zero. |
|
||||
| `MxGateway:Sessions:WorkerReadyWaitTimeoutMs` | `0` | Bounded time, in milliseconds, the gateway will wait for a worker to reach `Ready` when the session is already `Ready` but the worker state has transiently diverged (e.g. `Handshaking` after a heartbeat blip). Applies only to transient worker states; terminal states (`Faulted`/`Closing`/`Closed`/no worker) fail fast immediately regardless of this setting. `0` (the default) disables the wait and preserves the original fail-fast behavior. Must be greater than or equal to zero. |
|
||||
|
||||
All numeric session options must be greater than zero.
|
||||
|
||||
|
||||
@@ -185,6 +185,10 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase<GatewayOption
|
||||
options.DetachGraceSeconds,
|
||||
"MxGateway:Sessions:DetachGraceSeconds must be zero or greater (0 disables detach-grace retention).",
|
||||
builder);
|
||||
AddIfNegative(
|
||||
options.WorkerReadyWaitTimeoutMs,
|
||||
"MxGateway:Sessions:WorkerReadyWaitTimeoutMs must be greater than or equal to zero.",
|
||||
builder);
|
||||
|
||||
// NOTE: We intentionally do NOT reject !AllowMultipleEventSubscribers &&
|
||||
// MaxEventSubscribersPerSession > 1 as a hard validation error here. The default
|
||||
|
||||
@@ -56,4 +56,15 @@ public sealed class SessionOptions
|
||||
/// effectively 1 when it is <see langword="false"/>. Must be greater than zero.
|
||||
/// </summary>
|
||||
public int MaxEventSubscribersPerSession { get; init; } = 8;
|
||||
|
||||
/// <summary>
|
||||
/// Gets the bounded time, in milliseconds, the gateway will wait for a worker client
|
||||
/// to reach <c>Ready</c> when the session itself is already <c>Ready</c> but the worker
|
||||
/// state has transiently diverged (e.g. <c>Handshaking</c> after a heartbeat blip).
|
||||
/// The wait applies only to transient worker states; terminal states
|
||||
/// (<c>Faulted</c>/<c>Closing</c>/<c>Closed</c>/no worker) fail fast immediately.
|
||||
/// A value of <c>0</c> (the default) disables the wait — the gateway keeps the original
|
||||
/// fail-fast behavior. Must be greater than or equal to zero.
|
||||
/// </summary>
|
||||
public int WorkerReadyWaitTimeoutMs { get; init; }
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ public sealed class GatewayOptionsTests
|
||||
Assert.Equal(30, options.Sessions.DetachGraceSeconds);
|
||||
Assert.False(options.Sessions.AllowMultipleEventSubscribers);
|
||||
Assert.Equal(8, options.Sessions.MaxEventSubscribersPerSession);
|
||||
Assert.Equal(0, options.Sessions.WorkerReadyWaitTimeoutMs);
|
||||
|
||||
Assert.Equal(10_000, options.Events.QueueCapacity);
|
||||
Assert.Equal(EventBackpressurePolicy.FailFast, options.Events.BackpressurePolicy);
|
||||
@@ -88,6 +89,7 @@ public sealed class GatewayOptionsTests
|
||||
[InlineData("MxGateway:Sessions:DefaultLeaseSeconds", "0", "MxGateway:Sessions:DefaultLeaseSeconds must be greater than zero.")]
|
||||
[InlineData("MxGateway:Sessions:LeaseSweepIntervalSeconds", "0", "MxGateway:Sessions:LeaseSweepIntervalSeconds must be greater than zero.")]
|
||||
[InlineData("MxGateway:Sessions:DetachGraceSeconds", "-1", "MxGateway:Sessions:DetachGraceSeconds must be zero or greater (0 disables detach-grace retention).")]
|
||||
[InlineData("MxGateway:Sessions:WorkerReadyWaitTimeoutMs", "-1", "MxGateway:Sessions:WorkerReadyWaitTimeoutMs must be greater than or equal to zero.")]
|
||||
[InlineData("MxGateway:Events:QueueCapacity", "0", "MxGateway:Events:QueueCapacity must be greater than zero.")]
|
||||
[InlineData("MxGateway:Protocol:MaxGrpcMessageBytes", "0", "MxGateway:Protocol:MaxGrpcMessageBytes must be between")]
|
||||
[InlineData("MxGateway:Authentication:PepperSecretName", "", "MxGateway:Authentication:PepperSecretName is required")]
|
||||
|
||||
@@ -356,4 +356,41 @@ public sealed class GatewayOptionsValidatorTests
|
||||
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
|
||||
Assert.True(result.Succeeded);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// WorkerReadyWaitTimeoutMs validation
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Validate_Fails_WhenWorkerReadyWaitTimeoutMsIsNegative()
|
||||
{
|
||||
GatewayOptions options = CloneWithSessions(
|
||||
ValidOptions(),
|
||||
new SessionOptions { WorkerReadyWaitTimeoutMs = -1 });
|
||||
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
|
||||
Assert.True(result.Failed);
|
||||
Assert.Contains(
|
||||
result.Failures!,
|
||||
f => f.Contains("MxGateway:Sessions:WorkerReadyWaitTimeoutMs"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Validate_Succeeds_WhenWorkerReadyWaitTimeoutMsIsZero()
|
||||
{
|
||||
GatewayOptions options = CloneWithSessions(
|
||||
ValidOptions(),
|
||||
new SessionOptions { WorkerReadyWaitTimeoutMs = 0 });
|
||||
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
|
||||
Assert.True(result.Succeeded);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Validate_Succeeds_WhenWorkerReadyWaitTimeoutMsIsPositive()
|
||||
{
|
||||
GatewayOptions options = CloneWithSessions(
|
||||
ValidOptions(),
|
||||
new SessionOptions { WorkerReadyWaitTimeoutMs = 5000 });
|
||||
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
|
||||
Assert.True(result.Succeeded);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user