Files
mxaccessgw/docs/Sessions.md
T
Joseph Doherty a0203503a7 Code-review 2026-05-20 sweep: re-review at 1cd51bb, resolve 72 findings across all 11 modules
Re-reviewed every module/client against the 10-category checklist
(REVIEW-PROCESS.md) at commit 1cd51bb, filed 72 new findings, and
fixed them in three priority waves (3 High, 17 Medium, 52 Low).

Highs
- Server-017: enumerate AcknowledgeAlarm / QueryActiveAlarms in
  GatewayGrpcScopeResolver so non-admin keys can use them; document
  the mapping in docs/Authorization.md; add interceptor tests.
- Client.Java-013: add the five missing bulk-method stubs to the
  CLI FakeSession so the test module compiles on a clean tree.
- Client.Rust-013: fix the clippy::doc_lazy_continuation regression
  in generated tonic code by reformatting the ReadBulkCommand proto
  comment and scoping a #![allow(...)] to the generated submodules.

Mediums (highlights)
- Server: unify GatewaySession state-lock discipline (-015) and
  make DisposeAsync race-safe against in-flight CloseAsync (-016);
  add constraint-enforcement test coverage for the bulk-plan path
  (-021).
- Worker: introduce StaRuntimeShutdownException so RunAlarmPollLoop
  can distinguish graceful shutdown from a real STA-affinity
  violation (-016); have the watchdog skip StaHung while
  CurrentCommandCorrelationId is non-empty so a legitimate slow
  ReadBulk no longer self-faults (-017).
- Tests: add per-method round-trip + cancellation coverage for the
  11 GatewaySession bulk methods (-013); replace the real TCP probe
  in GalaxyHierarchyCacheTests with an IGalaxyRepository fake
  (-016).
- IntegrationTests: drive the StreamEvents writer in the live Write
  test and assert OnWriteComplete (-012); add live tests for
  Unadvise/RemoveItem/Unregister ordering, WriteSecured, and
  abnormal worker exit (-014).
- Worker.Tests: replace MxAccessSession reflection with an internal
  CreateForTesting factory (-016); cover WorkerCancel and
  unexpected-body envelope branches (-017).
- Client.Java: cancel MxEventStream when close() races
  beforeStart() (-014); return a CancellingCompletableFuture that
  actually forwards cancellation through .thenApply chains (-015).
- Client.Python: drop the silent localhost-plaintext downgrade in
  the CLI; require explicit --plaintext (-013).
- Client.Rust: stop bench-read-bulk from polluting success-latency
  histograms with failed-call durations (-015); add coverage for
  the five MalformedReply paths, the bulk-write helpers, the
  Error::Unavailable mapping, and the unary-fault path (-016).
- Contracts: extend docs/Contracts.md with the bulk read/write
  command family (-009).

Lows (highlights)
- Server: cap GalaxyGlobMatcher.RegexCache; align
  WorkerAlarmRpcDispatcher missing-session handling; drop the
  duplicate dashboard @page routes; refresh IAlarmRpcDispatcher
  XML doc.
- Worker: surface SetXmlAlarmQuery COM failures; remove dead
  subscriptionExpression / ExecutingCommand arms; preserve
  factory-supplied runtime sessions; split MxAlarmSnapshot.cs into
  three files.
- Tests: dispose the WebApplication in seven test classes; rebuild
  FakeWorkerProcess.WaitForExitAsync against a real TaskCompletion
  source; switch the heartbeat-expires test to ManualTimeProvider;
  add InvariantCulture to the remaining DateTimeOffset.Parse sites;
  document GalaxyFilterInputSafetyTests in GatewayTesting.md.
- IntegrationTests: comment fixes, RecordingServerStreamWriter
  IDisposable, class-level [Trait], single-source ZB default
  connection string.
- Worker.Tests: replace silent-return gating with LiveMxAccessFact
  so absent env vars SKIP not pass; PascalCase rename of probe
  [Fact]s; deterministic deadline test; new frame-protocol error
  tests; ComputeTransitions diff-coverage; relocate dev-rig probes
  to Probes/.
- Contracts: add round-trip coverage and per-field redaction /
  Galaxy-identifier comments to the protos.
- Client.Dotnet: introduce clients/dotnet/Directory.Build.props so
  TreatWarningsAsErrors / analysers apply; document
  DiscoverHierarchyOptions and IMxGatewayCliClient; require typed
  bulk-read handles in CLI; surface AcknowledgeAlarm transport
  faults through Translate().
- Client.Go: kill dead code in alarms_test / fakeGalaxyServer /
  runWriteBulkVariant; document the six new subcommands in
  writeUsage; drain galaxy-watch events on limit; switch io.EOF
  comparisons to errors.Is.
- Client.Java: shared shutdown helpers + new shutdownTimeout
  option; regex-based credential redaction; Long.toUnsignedString
  for uint64 sequence; doc fixes.
- Client.Python: combine duplicate imports; add coverage for
  _percentile / bench-read-bulk / MAX_AGGREGATE_EVENTS /
  _api_key_from_env; populate pyproject metadata and ship py.typed.
- Client.Rust: expose next_correlation_id() so CLI ping/close
  stop hard-coding correlation IDs; resync RustClientDesign.md
  with the current Session / Error surface and CLI subcommand set.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 09:46:47 -04:00

15 KiB

Gateway Sessions

The sessions subsystem owns the in-memory representation of an active gateway-to-worker pairing and coordinates its lifecycle from open through close. Each GatewaySession corresponds to exactly one MXAccess worker process connected over a dedicated named pipe.

Overview

A session is the gateway-side handle that callers use to invoke worker commands, stream worker events, and tear the worker down. The subsystem is split between the per-session state machine (GatewaySession), an in-memory directory (SessionRegistry), the orchestrator that opens and closes sessions (SessionManager), the worker construction step (SessionWorkerClientFactory), and a hosted service that drains sessions during host shutdown (SessionShutdownHostedService).

All four interfaces (ISessionManager, ISessionRegistry, ISessionWorkerClientFactory) plus SessionShutdownHostedService are wired as singletons by SessionServiceCollectionExtensions.AddGatewaySessions.

Key Types

GatewaySession

GatewaySession is a sealed class that holds the identity, configured timeouts, worker client reference, and current SessionState for one session. State is protected by a private _syncRoot lock so that property reads and transitions are observed atomically by concurrent gRPC calls and the lease sweeper.

The session id is an opaque string in the form session-{guid:N} and the per-session pipe name is mxaccess-gateway-{ProcessId}-{SessionId}. Encoding the gateway PID into the pipe name avoids collisions when an old gateway process leaks pipes that the OS has not yet reclaimed.

SessionState itself is the protobuf-generated enum from MxGateway.Contracts.Proto, so it is shared between the gateway and clients on the wire.

public void TransitionTo(SessionState nextState)
{
    lock (_syncRoot)
    {
        if (_state is SessionState.Closed)
        {
            return;
        }

        if (_state is SessionState.Faulted && nextState is not SessionState.Closed)
        {
            return;
        }

        if (_state is SessionState.Closing
            && nextState is not SessionState.Closed
            && nextState is not SessionState.Faulted)
        {
            return;
        }

        _state = nextState;
    }
}

Closed is terminal, Faulted only allows a transition to Closed, and Closing only allows a transition to Closed or Faulted. This guards against late callbacks (worker exit, heartbeat timeout) re-animating a session that is already tearing down or torn down — once CloseAsync has set Closing under _syncRoot, no TransitionTo(Ready) from another thread can walk the session back to Ready. Both close-related writes (Closing and Closed) go through _syncRoot exactly like every other state write; _closeLock only serializes concurrent close attempts.

SessionManager (ISessionManager)

SessionManager is the orchestrator. It exposes OpenSessionAsync, TryGetSession, InvokeAsync, ReadEventsAsync, CloseSessionAsync, CloseExpiredLeasesAsync, and ShutdownAsync. It composes ISessionRegistry, ISessionWorkerClientFactory, GatewayMetrics, and GatewayOptions.

Concurrency is bounded by a SemaphoreSlim initialized to GatewayOptions.Sessions.MaxSessions. Open requests that exceed the bound throw SessionManagerException with SessionLimitExceeded rather than queuing; the caller is expected to retry.

private void EnsureSessionCapacity()
{
    if (!_sessionSlots.Wait(0))
    {
        throw new SessionManagerException(
            SessionManagerErrorCode.SessionLimitExceeded,
            $"Gateway session limit {_options.Sessions.MaxSessions} has been reached.");
    }
}

SessionManager also defines three close-reason constants — DefaultCloseReason ("client-close"), GatewayShutdownReason ("gateway-shutdown"), and LeaseExpiredReason ("lease-expired") — so that the metrics and worker shutdown paths agree on a fixed vocabulary.

SessionRegistry (ISessionRegistry)

SessionRegistry is a thin wrapper over a ConcurrentDictionary<string, GatewaySession> keyed by session id with StringComparer.Ordinal. Snapshot materializes the values into an array so iteration callers (lease sweeper, shutdown) do not race with concurrent TryAdd and TryRemove calls.

ActiveCount filters out sessions whose state is Closed; this is consumed by metrics and the dashboard, where Count would otherwise momentarily over-report during teardown.

SessionWorkerClientFactory (ISessionWorkerClientFactory)

SessionWorkerClientFactory.CreateAsync is the only path that builds an IWorkerClient. It drives the session through the protobuf SessionState substates in order: StartingWorker, WaitingForPipe, Handshaking, InitializingWorker. The substates are wire-visible so the dashboard and clients can render startup progress.

A linked CancellationTokenSource enforces session.StartupTimeout. If startup fails or times out, the factory either kills the partially-constructed WorkerClient or, if the client was never built, kills the launched process and disposes the named pipe before rethrowing. A pure timeout is rewritten as TimeoutException so callers can distinguish it from caller-driven cancellation:

if (exception is OperationCanceledException
    && startupCancellation.IsCancellationRequested
    && !cancellationToken.IsCancellationRequested)
{
    throw new TimeoutException(
        $"Worker session {session.SessionId} did not complete startup within {session.StartupTimeout}.",
        exception);
}

The named pipe is created with maxNumberOfServerInstances: 1 so a second worker cannot connect to the same pipe name even if the first launch is still pending. Combined with the per-session nonce passed to the worker, this is the gateway's defense against a foreign process answering a pipe.

SessionShutdownHostedService

SessionShutdownHostedService is an IHostedService whose only job is to call ISessionManager.ShutdownAsync from StopAsync. It catches OperationCanceledException triggered by the host shutdown timeout and logs a warning so that an over-running shutdown does not surface as an unhandled exception.

SessionOpenRequest

SessionOpenRequest is the gateway-internal record passed to OpenSessionAsync. It is constructed from the wire-level OpenSessionRequest via SessionOpenRequest.FromContract. Keeping a separate internal record means the gRPC layer can normalize input (defaulting backend, sanitizing strings) without leaking generated proto types into SessionManager.

public sealed record SessionOpenRequest(
    string? RequestedBackend,
    string? ClientSessionName,
    string? ClientCorrelationId,
    Duration? CommandTimeout)
{
    public static SessionOpenRequest FromContract(OpenSessionRequest request)
    {
        ArgumentNullException.ThrowIfNull(request);

        return new SessionOpenRequest(
            request.RequestedBackend,
            request.ClientSessionName,
            request.ClientCorrelationId,
            request.CommandTimeout);
    }
}

SessionCloseResult

SessionCloseResult is the record returned from a successful close. AlreadyClosed distinguishes "this call closed the session" from "the session was already closed when we acquired the close lock", which the metrics layer uses to avoid double-counting.

public sealed record SessionCloseResult(
    string SessionId,
    SessionState FinalState,
    bool AlreadyClosed);

SessionCloseStartedException

SessionCloseStartedException is internal and is only thrown from inside GatewaySession.CloseAsync when the close path has already begun mutating worker state and a subsequent step fails. SessionManager.CloseSessionCoreAsync catches it, marks the session faulted, increments the close-failed metric, removes the session from the registry, and rethrows it wrapped as SessionManagerException with CloseFailed. The intermediate type exists so the public API surface only ever exposes SessionManagerException.

SessionManagerException and SessionManagerErrorCode

SessionManagerException is the single public error type emitted from this subsystem; the code is carried in the ErrorCode property and is also surfaced to metrics tags via SessionManagerErrorCode.ToString().

Code Meaning
SessionNotFound The session id is not in the registry.
SessionNotReady The session or its IWorkerClient is not in Ready state.
EventSubscriberAlreadyActive A second event subscriber attached when only one is allowed.
EventQueueOverflow Reserved for the worker event channel overflow path.
SessionLimitExceeded MaxSessions is in use.
OpenFailed OpenSessionAsync failed; the inner exception carries the cause.
CloseFailed A close started but did not complete cleanly; the session is removed and faulted.

Lifecycle

Open

SessionManager.OpenSessionAsync allocates a session slot, builds the GatewaySession, registers it, and asks the factory to bring up the worker. Failures roll back every preceding step:

catch (Exception exception)
{
    session?.MarkFaulted(exception.Message);
    if (session is not null)
    {
        _registry.TryRemove(session.SessionId, out _);
        await session.DisposeAsync().ConfigureAwait(false);
    }

    ReleaseSessionSlot();
    _metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString());
    _logger.LogWarning(
        exception,
        "Failed to open gateway session {SessionId}.",
        session?.SessionId ?? "<not-created>");

    throw new SessionManagerException(
        SessionManagerErrorCode.OpenFailed,
        session is null ? "Failed to create session." : $"Failed to open session {session.SessionId}.",
        exception);
}

The order — fault, deregister, dispose, release slot, record metric, log, rethrow — matters because releasing the semaphore before disposal would let the next open race the worker process tear-down on the same machine.

Run

While Ready, callers reach the worker through SessionManager.InvokeAsync or ReadEventsAsync. Both delegate to GatewaySession, which checks the state under lock and updates LastClientActivityAt on every invocation. GatewaySession also exposes typed bulk helpers (AddItemBulkAsync, SubscribeBulkAsync, etc.) that wrap WorkerCommand round-trips and translate non-Ok ProtocolStatus replies into SessionManagerException with SessionNotReady.

Event streaming uses AttachEventSubscriber which returns a disposable lease. When allowMultipleSubscribers is false the second attach throws EventSubscriberAlreadyActive; this prevents two gRPC streams from racing on the same worker event channel. Active event subscribers keep the session lease from expiring until the stream is disposed.

Sessions open with MxGateway:Sessions:DefaultLeaseSeconds (default 1800) added to the open timestamp. Unary client activity refreshes the lease by the same duration. ExtendLease and IsLeaseExpired cooperate with SessionManager.CloseExpiredLeasesAsync, which iterates a registry snapshot and closes any session whose lease has expired with LeaseExpiredReason. SessionLeaseMonitorHostedService runs that sweep every MxGateway:Sessions:LeaseSweepIntervalSeconds seconds (default 30).

Close

GatewaySession.CloseAsync is serialized by a per-session SemaphoreSlim (_closeLock) so only one close runs at a time, but every read/write of _state still passes through _syncRoot (via TryBeginClose and MarkClosed). The close path therefore obeys the same lock discipline as TransitionTo / MarkFaulted: it transitions to Closing, asks the worker client to shut down within ShutdownTimeout, and on success transitions to Closed. DisposeAsync waits on _closeLock once before disposing the semaphore so an in-flight close's Release() cannot race against the dispose. If WorkerClient.ShutdownAsync throws, the session falls back to IWorkerClient.Kill (forced close):

if (_workerClient is not null)
{
    try
    {
        await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false);
    }
    catch (Exception exception)
    {
        try
        {
            _workerClient.Kill(reason);
        }
        catch (Exception killException)
        {
            throw new SessionCloseStartedException(
                $"Session {SessionId} close failed after worker shutdown started.",
                new AggregateException(exception, killException));
        }

        throw;
    }
}

If both graceful shutdown and the kill fall-back fail, the original and kill exceptions are bundled into an AggregateException and surfaced as SessionCloseStartedException. SessionManager.CloseSessionCoreAsync then translates that into a SessionManagerException with CloseFailed and removes the session.

GatewaySession.KillWorker is the unconditional forced-close path used by shutdown when graceful close itself throws.

Shutdown Coordination

SessionShutdownHostedService.StopAsync calls SessionManager.ShutdownAsync, which closes every registered session with GatewayShutdownReason. The shutdown loop catches per-session exceptions, calls KillWorker, and removes the session so that one stuck worker cannot block the rest of the host:

public async Task ShutdownAsync(CancellationToken cancellationToken)
{
    foreach (GatewaySession session in _registry.Snapshot())
    {
        try
        {
            await CloseSessionCoreAsync(session, GatewayShutdownReason, cancellationToken).ConfigureAwait(false);
        }
        catch (Exception exception)
        {
            _logger.LogWarning(
                exception,
                "Graceful shutdown failed for session {SessionId}; killing worker.",
                session.SessionId);
            if (_registry.TryGet(session.SessionId, out _))
            {
                session.KillWorker(GatewayShutdownReason);
                await RemoveSessionAsync(session).ConfigureAwait(false);
            }
        }
    }
}

Iterating over Snapshot rather than the live dictionary lets RemoveSessionAsync mutate the registry inside the loop without throwing.

Dependency Injection

SessionServiceCollectionExtensions.AddGatewaySessions registers the four singletons and the hosted service:

public static IServiceCollection AddGatewaySessions(this IServiceCollection services)
{
    services.AddSingleton<ISessionRegistry, SessionRegistry>();
    services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
    services.AddSingleton<ISessionManager, SessionManager>();
    services.AddHostedService<SessionShutdownHostedService>();

    return services;
}

The registry must be a singleton because its ConcurrentDictionary is the source of truth for session state across the gRPC service, the lease sweeper, the dashboard, and the shutdown hosted service. Registering SessionShutdownHostedService last ensures it is constructed after ISessionManager and therefore drains sessions during host stop.