297 lines
19 KiB
Markdown
297 lines
19 KiB
Markdown
# Gateway Sessions
|
|
|
|
The sessions subsystem owns the in-memory representation of an active gateway-to-worker pairing and coordinates its lifecycle from open through close. Each `GatewaySession` corresponds to exactly one MXAccess worker process connected over a dedicated named pipe.
|
|
|
|
## Overview
|
|
|
|
A session is the gateway-side handle that callers use to invoke worker commands, stream worker events, and tear the worker down. The subsystem is split between the per-session state machine (`GatewaySession`), an in-memory directory (`SessionRegistry`), the orchestrator that opens and closes sessions (`SessionManager`), the worker construction step (`SessionWorkerClientFactory`), and a hosted service that drains sessions during host shutdown (`SessionShutdownHostedService`).
|
|
|
|
All four interfaces (`ISessionManager`, `ISessionRegistry`, `ISessionWorkerClientFactory`) plus `SessionShutdownHostedService` are wired as singletons by `SessionServiceCollectionExtensions.AddGatewaySessions`.
|
|
|
|
## Key Types
|
|
|
|
### GatewaySession
|
|
|
|
`GatewaySession` is a sealed class that holds the identity, configured timeouts, worker client reference, and current `SessionState` for one session. State is protected by a private `_syncRoot` lock so that property reads and transitions are observed atomically by concurrent gRPC calls and the lease sweeper.
|
|
|
|
The session id is an opaque string in the form `session-{guid:N}` and the per-session pipe name is `mxaccess-gateway-{ProcessId}-{SessionId}`. Encoding the gateway PID into the pipe name avoids collisions when an old gateway process leaks pipes that the OS has not yet reclaimed.
|
|
|
|
`SessionState` itself is the protobuf-generated enum from `ZB.MOM.WW.MxGateway.Contracts.Proto`, so it is shared between the gateway and clients on the wire.
|
|
|
|
```csharp
|
|
public void TransitionTo(SessionState nextState)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
if (_state is SessionState.Closed)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (_state is SessionState.Faulted && nextState is not SessionState.Closed)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (_state is SessionState.Closing
|
|
&& nextState is not SessionState.Closed
|
|
&& nextState is not SessionState.Faulted)
|
|
{
|
|
return;
|
|
}
|
|
|
|
_state = nextState;
|
|
}
|
|
}
|
|
```
|
|
|
|
`Closed` is terminal, `Faulted` only allows a transition to `Closed`, and `Closing` only allows a transition to `Closed` or `Faulted`. This guards against late callbacks (worker exit, heartbeat timeout) re-animating a session that is already tearing down or torn down — once `CloseAsync` has set `Closing` under `_syncRoot`, no `TransitionTo(Ready)` from another thread can walk the session back to `Ready`. Both close-related writes (`Closing` and `Closed`) go through `_syncRoot` exactly like every other state write; `_closeLock` only serializes concurrent close attempts.
|
|
|
|
### SessionManager (ISessionManager)
|
|
|
|
`SessionManager` is the orchestrator. It exposes `OpenSessionAsync`, `TryGetSession`, `InvokeAsync`, `ReadEventsAsync`, `CloseSessionAsync`, `KillWorkerAsync`, `CloseExpiredLeasesAsync`, and `ShutdownAsync`. It composes `ISessionRegistry`, `ISessionWorkerClientFactory`, `GatewayMetrics`, and `GatewayOptions`.
|
|
|
|
`CloseSessionAsync` and `KillWorkerAsync` are both end-of-life paths but differ in what they offer the worker:
|
|
|
|
- `CloseSessionAsync` is the graceful path: it calls `GatewaySession.CloseAsync`, which asks the worker to shut down via `IWorkerClient.ShutdownAsync` and only kills the process as a fallback if shutdown fails.
|
|
- `KillWorkerAsync` is the forceful path used by the dashboard's admin Kill button: it calls `GatewaySession.KillWorker` directly, which kills the worker process immediately with no graceful-shutdown attempt and transitions the session to `Closed`.
|
|
|
|
Both paths converge on the same registry/metrics cleanup, so the open-session slot is released and `mxgateway.sessions.closed` is incremented either way.
|
|
|
|
Concurrency is bounded by a `SemaphoreSlim` initialized to `GatewayOptions.Sessions.MaxSessions`. Open requests that exceed the bound throw `SessionManagerException` with `SessionLimitExceeded` rather than queuing; the caller is expected to retry.
|
|
|
|
```csharp
|
|
private void EnsureSessionCapacity()
|
|
{
|
|
if (!_sessionSlots.Wait(0))
|
|
{
|
|
throw new SessionManagerException(
|
|
SessionManagerErrorCode.SessionLimitExceeded,
|
|
$"Gateway session limit {_options.Sessions.MaxSessions} has been reached.");
|
|
}
|
|
}
|
|
```
|
|
|
|
`SessionManager` also defines four close-reason constants — `DefaultCloseReason` (`"client-close"`), `GatewayShutdownReason` (`"gateway-shutdown"`), `LeaseExpiredReason` (`"lease-expired"`), and `DetachGraceExpiredReason` (`"detach-grace-expired"`) — so that the metrics and worker shutdown paths agree on a fixed vocabulary.
|
|
|
|
### SessionRegistry (ISessionRegistry)
|
|
|
|
`SessionRegistry` is a thin wrapper over a `ConcurrentDictionary<string, GatewaySession>` keyed by session id with `StringComparer.Ordinal`. `Snapshot` materializes the values into an array so iteration callers (lease sweeper, shutdown) do not race with concurrent `TryAdd` and `TryRemove` calls.
|
|
|
|
`ActiveCount` filters out sessions whose state is `Closed`; this is consumed by metrics and the dashboard, where `Count` would otherwise momentarily over-report during teardown.
|
|
|
|
### SessionWorkerClientFactory (ISessionWorkerClientFactory)
|
|
|
|
`SessionWorkerClientFactory.CreateAsync` is the only path that builds an `IWorkerClient`. It drives the session through the protobuf `SessionState` substates in order: `StartingWorker`, `WaitingForPipe`, `Handshaking`, `InitializingWorker`. The substates are wire-visible so the dashboard and clients can render startup progress.
|
|
|
|
A linked `CancellationTokenSource` enforces `session.StartupTimeout`. If startup fails or times out, the factory either kills the partially-constructed `WorkerClient` or, if the client was never built, kills the launched process and disposes the named pipe before rethrowing. A pure timeout is rewritten as `TimeoutException` so callers can distinguish it from caller-driven cancellation:
|
|
|
|
```csharp
|
|
if (exception is OperationCanceledException
|
|
&& startupCancellation.IsCancellationRequested
|
|
&& !cancellationToken.IsCancellationRequested)
|
|
{
|
|
throw new TimeoutException(
|
|
$"Worker session {session.SessionId} did not complete startup within {session.StartupTimeout}.",
|
|
exception);
|
|
}
|
|
```
|
|
|
|
The named pipe is created with `maxNumberOfServerInstances: 1` so a second worker cannot connect to the same pipe name even if the first launch is still pending. Combined with the per-session nonce passed to the worker, this is the gateway's defense against a foreign process answering a pipe.
|
|
|
|
### SessionShutdownHostedService
|
|
|
|
`SessionShutdownHostedService` is an `IHostedService` whose only job is to call `ISessionManager.ShutdownAsync` from `StopAsync`. It catches `OperationCanceledException` triggered by the host shutdown timeout and logs a warning so that an over-running shutdown does not surface as an unhandled exception.
|
|
|
|
### SessionOpenRequest
|
|
|
|
`SessionOpenRequest` is the gateway-internal record passed to `OpenSessionAsync`. It is constructed from the wire-level `OpenSessionRequest` via `SessionOpenRequest.FromContract`. Keeping a separate internal record means the gRPC layer can normalize input (defaulting backend, sanitizing strings) without leaking generated proto types into `SessionManager`.
|
|
|
|
```csharp
|
|
public sealed record SessionOpenRequest(
|
|
string? RequestedBackend,
|
|
string? ClientSessionName,
|
|
string? ClientCorrelationId,
|
|
Duration? CommandTimeout)
|
|
{
|
|
public static SessionOpenRequest FromContract(OpenSessionRequest request)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(request);
|
|
|
|
return new SessionOpenRequest(
|
|
request.RequestedBackend,
|
|
request.ClientSessionName,
|
|
request.ClientCorrelationId,
|
|
request.CommandTimeout);
|
|
}
|
|
}
|
|
```
|
|
|
|
### SessionCloseResult
|
|
|
|
`SessionCloseResult` is the record returned from a successful close. `AlreadyClosed` distinguishes "this call closed the session" from "the session was already closed when we acquired the close lock", which the metrics layer uses to avoid double-counting.
|
|
|
|
```csharp
|
|
public sealed record SessionCloseResult(
|
|
string SessionId,
|
|
SessionState FinalState,
|
|
bool AlreadyClosed);
|
|
```
|
|
|
|
### SessionCloseStartedException
|
|
|
|
`SessionCloseStartedException` is `internal` and is only thrown from inside `GatewaySession.CloseAsync` when the close path has already begun mutating worker state and a subsequent step fails. `SessionManager.CloseSessionCoreAsync` catches it, marks the session faulted, increments the close-failed metric, removes the session from the registry, and rethrows it wrapped as `SessionManagerException` with `CloseFailed`. The intermediate type exists so the public API surface only ever exposes `SessionManagerException`.
|
|
|
|
### SessionManagerException and SessionManagerErrorCode
|
|
|
|
`SessionManagerException` is the single public error type emitted from this subsystem; the code is carried in the `ErrorCode` property and is also surfaced to metrics tags via `SessionManagerErrorCode.ToString()`.
|
|
|
|
| Code | Meaning |
|
|
|------|---------|
|
|
| `SessionNotFound` | The session id is not in the registry. |
|
|
| `SessionNotReady` | The session or its `IWorkerClient` is not in `Ready` state. |
|
|
| `EventSubscriberAlreadyActive` | A second event subscriber attached in single-subscriber mode (`AllowMultipleEventSubscribers` is `false`). |
|
|
| `EventSubscriberLimitReached` | In multi-subscriber mode, an attach exceeded `MaxEventSubscribersPerSession` concurrent external subscribers. |
|
|
| `EventQueueOverflow` | Reserved for the worker event channel overflow path. |
|
|
| `SessionLimitExceeded` | `MaxSessions` is in use. |
|
|
| `OpenFailed` | `OpenSessionAsync` failed; the inner exception carries the cause. |
|
|
| `CloseFailed` | A close started but did not complete cleanly; the session is removed and faulted. |
|
|
|
|
## Lifecycle
|
|
|
|
### Open
|
|
|
|
`SessionManager.OpenSessionAsync` allocates a session slot, builds the `GatewaySession`, registers it, and asks the factory to bring up the worker. Failures roll back every preceding step:
|
|
|
|
```csharp
|
|
catch (Exception exception)
|
|
{
|
|
session?.MarkFaulted(exception.Message);
|
|
if (session is not null)
|
|
{
|
|
_registry.TryRemove(session.SessionId, out _);
|
|
await session.DisposeAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
ReleaseSessionSlot();
|
|
_metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString());
|
|
_logger.LogWarning(
|
|
exception,
|
|
"Failed to open gateway session {SessionId}.",
|
|
session?.SessionId ?? "<not-created>");
|
|
|
|
throw new SessionManagerException(
|
|
SessionManagerErrorCode.OpenFailed,
|
|
session is null ? "Failed to create session." : $"Failed to open session {session.SessionId}.",
|
|
exception);
|
|
}
|
|
```
|
|
|
|
The order — fault, deregister, dispose, release slot, record metric, log, rethrow — matters because releasing the semaphore before disposal would let the next open race the worker process tear-down on the same machine.
|
|
|
|
### Run
|
|
|
|
While `Ready`, callers reach the worker through `SessionManager.InvokeAsync` or `ReadEventsAsync`. Both delegate to `GatewaySession`, which checks the state under lock and updates `LastClientActivityAt` on every invocation. `GatewaySession` also exposes typed bulk helpers (`AddItemBulkAsync`, `SubscribeBulkAsync`, etc.) that wrap `WorkerCommand` round-trips and translate non-`Ok` `ProtocolStatus` replies into `SessionManagerException` with `SessionNotReady`.
|
|
|
|
Event streaming uses `AttachEventSubscriber` which returns a disposable lease. When `allowMultipleSubscribers` is false (single-subscriber mode) a second attach throws `EventSubscriberAlreadyActive`; this prevents two gRPC streams from racing on the same worker event channel. When it is true, up to `MaxEventSubscribersPerSession` concurrent external subscribers are allowed and the next attach throws `EventSubscriberLimitReached`. The count-check-and-increment is atomic under the session lock, so concurrent attaches can never exceed the cap. The gateway-owned internal dashboard mirror subscriber is registered directly on the distributor and does not count toward the cap. Active event subscribers keep the session lease from expiring until the stream is disposed.
|
|
|
|
`FailFast` event backpressure faults the whole session only in single-subscriber mode; in multi-subscriber mode it degrades to a per-subscriber disconnect so one slow consumer never faults a session shared by others. The session passes its mode to the `SessionEventDistributor` at construction, so this decision is made on the fixed mode rather than a live subscriber-count snapshot.
|
|
|
|
Sessions open with `MxGateway:Sessions:DefaultLeaseSeconds` (default 1800) added to the open timestamp. Unary client activity refreshes the lease by the same duration. `ExtendLease` and `IsLeaseExpired` cooperate with `SessionManager.CloseExpiredLeasesAsync`, which iterates a registry snapshot and closes any session whose lease has expired with `LeaseExpiredReason`. `SessionLeaseMonitorHostedService` runs that sweep every `MxGateway:Sessions:LeaseSweepIntervalSeconds` seconds (default 30).
|
|
|
|
#### Detach-grace retention
|
|
|
|
`MxGateway:Sessions:DetachGraceSeconds` (default 30) is a bounded retention window kept after a session's *last external (gRPC) event-stream subscriber* drops, so a client can reconnect to the same session instead of having it torn down on the first stream disconnect. While the window is open the session stays `Ready` and fully usable — worker commands continue to work and a reconnecting subscriber re-attaches normally. Because retention is keyed on the *external* subscriber count (`_activeEventSubscriberCount`), and the gateway-owned internal dashboard mirror registers directly on the distributor with `isInternal: true` and is therefore *not* counted, a session whose only remaining subscriber is the dashboard mirror still enters detach-grace.
|
|
|
|
Mechanically: when the last external subscriber detaches and `DetachGraceSeconds > 0`, `DetachEventSubscriber` stamps `DetachedAtUtc` from the session's `TimeProvider` under `_syncRoot` (the detach→grace-start transition). `AttachEventSubscriber` clears `DetachedAtUtc` under the same lock when a subscriber re-attaches (the reattach→grace-cancel transition), so the two races and the sweeper's read all serialize on `_syncRoot`. `SessionManager.CloseExpiredLeasesAsync` checks `IsDetachGraceExpired(now)` alongside `IsLeaseExpired(now)`: a session detached for at least `DetachGraceSeconds` with no active external subscriber is closed by the same lease sweep, with the distinct `DetachGraceExpiredReason` (`"detach-grace-expired"`) so operators can tell a short reconnect-window expiry from a long idle-lease expiry. Setting `DetachGraceSeconds` to `0` disables retention and reverts to the original behavior: a detached session is retained only until its normal lease expires.
|
|
|
|
The reconnect/replay path that re-attaches a dropped client to a retained session is implemented separately (Task 12); `DetachGraceSeconds` controls retention and expiry only.
|
|
|
|
### Close
|
|
|
|
`GatewaySession.CloseAsync` is serialized by a per-session `SemaphoreSlim` (`_closeLock`) so only one close runs at a time, but every read/write of `_state` still passes through `_syncRoot` (via `TryBeginClose` and `MarkClosed`). The close path therefore obeys the same lock discipline as `TransitionTo` / `MarkFaulted`: it transitions to `Closing`, asks the worker client to shut down within `ShutdownTimeout`, and on success transitions to `Closed`. `DisposeAsync` waits on `_closeLock` once before disposing the semaphore so an in-flight close's `Release()` cannot race against the dispose. If `WorkerClient.ShutdownAsync` throws, the session falls back to `IWorkerClient.Kill` (forced close):
|
|
|
|
```csharp
|
|
if (_workerClient is not null)
|
|
{
|
|
try
|
|
{
|
|
await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
catch (Exception exception)
|
|
{
|
|
try
|
|
{
|
|
_workerClient.Kill(reason);
|
|
}
|
|
catch (Exception killException)
|
|
{
|
|
throw new SessionCloseStartedException(
|
|
$"Session {SessionId} close failed after worker shutdown started.",
|
|
new AggregateException(exception, killException));
|
|
}
|
|
|
|
throw;
|
|
}
|
|
}
|
|
```
|
|
|
|
If both graceful shutdown and the kill fall-back fail, the original and kill exceptions are bundled into an `AggregateException` and surfaced as `SessionCloseStartedException`. `SessionManager.CloseSessionCoreAsync` then translates that into a `SessionManagerException` with `CloseFailed` and removes the session.
|
|
|
|
`GatewaySession.KillWorker` is the unconditional forced-close path used by shutdown when graceful close itself throws, and also by `SessionManager.KillWorkerAsync` — the explicit kill path that the dashboard's admin Kill button invokes. `KillWorkerAsync` skips `WorkerClient.ShutdownAsync` entirely, so `KillCount` increments while `ShutdownCount` does not; the session is then removed from the registry and the open-session slot is released, identical to the cleanup that follows a successful `CloseSessionAsync`.
|
|
|
|
## Shutdown Coordination
|
|
|
|
`SessionShutdownHostedService.StopAsync` calls `SessionManager.ShutdownAsync`, which closes every registered session with `GatewayShutdownReason`. The shutdown loop catches per-session exceptions, calls `KillWorker`, and removes the session so that one stuck worker cannot block the rest of the host:
|
|
|
|
```csharp
|
|
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
|
{
|
|
foreach (GatewaySession session in _registry.Snapshot())
|
|
{
|
|
try
|
|
{
|
|
await CloseSessionCoreAsync(session, GatewayShutdownReason, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
catch (Exception exception)
|
|
{
|
|
_logger.LogWarning(
|
|
exception,
|
|
"Graceful shutdown failed for session {SessionId}; killing worker.",
|
|
session.SessionId);
|
|
if (_registry.TryGet(session.SessionId, out _))
|
|
{
|
|
session.KillWorker(GatewayShutdownReason);
|
|
await RemoveSessionAsync(session).ConfigureAwait(false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
Iterating over `Snapshot` rather than the live dictionary lets `RemoveSessionAsync` mutate the registry inside the loop without throwing.
|
|
|
|
## Dependency Injection
|
|
|
|
`SessionServiceCollectionExtensions.AddGatewaySessions` registers the four singletons and the hosted service:
|
|
|
|
```csharp
|
|
public static IServiceCollection AddGatewaySessions(this IServiceCollection services)
|
|
{
|
|
services.AddSingleton<ISessionRegistry, SessionRegistry>();
|
|
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
|
|
services.AddSingleton<ISessionManager, SessionManager>();
|
|
services.AddHostedService<SessionShutdownHostedService>();
|
|
|
|
return services;
|
|
}
|
|
```
|
|
|
|
The registry must be a singleton because its `ConcurrentDictionary` is the source of truth for session state across the gRPC service, the lease sweeper, the dashboard, and the shutdown hosted service. Registering `SessionShutdownHostedService` last ensures it is constructed after `ISessionManager` and therefore drains sessions during host stop.
|
|
|
|
## Related Documentation
|
|
|
|
- [Gateway Process Design](./GatewayProcessDesign.md)
|
|
- [Gateway Configuration](./GatewayConfiguration.md)
|
|
- [Worker Process Launcher](./WorkerProcessLauncher.md)
|