From d496f1fd751bfee4270caf9187ad55dd39505c1b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 17:29:36 -0400 Subject: [PATCH] Issue #12: implement session manager and registry --- docs/gateway-process-design.md | 14 + gateway.md | 6 + src/MxGateway.Server/GatewayApplication.cs | 2 + .../Sessions/GatewaySession.cs | 290 ++++++++++++++++ .../Sessions/ISessionManager.cs | 34 ++ .../Sessions/ISessionRegistry.cs | 16 + .../Sessions/ISessionWorkerClientFactory.cs | 8 + .../Sessions/SessionCloseResult.cs | 8 + .../Sessions/SessionManager.cs | 287 ++++++++++++++++ .../Sessions/SessionManagerErrorCode.cs | 10 + .../Sessions/SessionManagerException.cs | 23 ++ .../Sessions/SessionOpenRequest.cs | 22 ++ .../Sessions/SessionRegistry.cs | 39 +++ .../SessionServiceCollectionExtensions.cs | 13 + .../Sessions/SessionWorkerClientFactory.cs | 144 ++++++++ .../Gateway/Sessions/SessionManagerTests.cs | 320 ++++++++++++++++++ 16 files changed, 1236 insertions(+) create mode 100644 src/MxGateway.Server/Sessions/GatewaySession.cs create mode 100644 src/MxGateway.Server/Sessions/ISessionManager.cs create mode 100644 src/MxGateway.Server/Sessions/ISessionRegistry.cs create mode 100644 src/MxGateway.Server/Sessions/ISessionWorkerClientFactory.cs create mode 100644 src/MxGateway.Server/Sessions/SessionCloseResult.cs create mode 100644 src/MxGateway.Server/Sessions/SessionManager.cs create mode 100644 src/MxGateway.Server/Sessions/SessionManagerErrorCode.cs create mode 100644 src/MxGateway.Server/Sessions/SessionManagerException.cs create mode 100644 src/MxGateway.Server/Sessions/SessionOpenRequest.cs create mode 100644 src/MxGateway.Server/Sessions/SessionRegistry.cs create mode 100644 src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs create mode 100644 src/MxGateway.Server/Sessions/SessionWorkerClientFactory.cs create mode 100644 src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs diff --git a/docs/gateway-process-design.md b/docs/gateway-process-design.md index a300590..49979bf 100644 --- a/docs/gateway-process-design.md +++ b/docs/gateway-process-design.md @@ -330,6 +330,20 @@ The worker remains authoritative for MXAccess handles. The gateway may keep a shadow state for diagnostics, but it must not invent, rewrite, or recycle MXAccess handles. +`SessionManager` owns the current in-memory session registry. It allocates a +session id, creates the worker pipe name and nonce, registers the session before +worker startup, and removes the session if startup fails. A successful +`OpenSession` attaches the ready `IWorkerClient` and transitions the session to +`Ready`. + +Only `Ready` sessions accept command and event operations. `CloseSession` is +idempotent for sessions still known to the registry: the first close shuts down +the worker, and later closes return the final `Closed` state. Lease handling is +exposed as a session hook so a monitor can close expired sessions without +embedding lease policy in the worker client. Gateway shutdown walks the +registry, closes each known session, and kills a worker if graceful shutdown +fails. + ## Worker Launch The gateway should launch the worker using explicit configuration: diff --git a/gateway.md b/gateway.md index 3bc2c47..7261d3d 100644 --- a/gateway.md +++ b/gateway.md @@ -799,6 +799,12 @@ Core operations: - track worker state, - close or kill worker. +The gateway implementation keeps sessions in an in-memory `SessionRegistry` +keyed by session id. `SessionManager` owns the state machine, creates +per-session pipe names and nonces, starts the worker through the worker-client +factory, gates commands to `Ready` sessions, exposes lease-close hooks, and +cleans up workers during gateway shutdown. + State machine: ```text diff --git a/src/MxGateway.Server/GatewayApplication.cs b/src/MxGateway.Server/GatewayApplication.cs index 0215706..1e03b37 100644 --- a/src/MxGateway.Server/GatewayApplication.cs +++ b/src/MxGateway.Server/GatewayApplication.cs @@ -4,6 +4,7 @@ using MxGateway.Server.Diagnostics; using MxGateway.Server.Metrics; using MxGateway.Server.Security.Authentication; using MxGateway.Server.Security.Authorization; +using MxGateway.Server.Sessions; using MxGateway.Server.Workers; namespace MxGateway.Server; @@ -31,6 +32,7 @@ public static class GatewayApplication builder.Services.AddHealthChecks(); builder.Services.AddSingleton(); builder.Services.AddWorkerProcessLauncher(); + builder.Services.AddGatewaySessions(); return builder; } diff --git a/src/MxGateway.Server/Sessions/GatewaySession.cs b/src/MxGateway.Server/Sessions/GatewaySession.cs new file mode 100644 index 0000000..d010669 --- /dev/null +++ b/src/MxGateway.Server/Sessions/GatewaySession.cs @@ -0,0 +1,290 @@ +using MxGateway.Contracts.Proto; +using MxGateway.Server.Workers; + +namespace MxGateway.Server.Sessions; + +public sealed class GatewaySession +{ + private readonly object _syncRoot = new(); + private readonly SemaphoreSlim _closeLock = new(1, 1); + private IWorkerClient? _workerClient; + private SessionState _state = SessionState.Creating; + private string? _finalFault; + private DateTimeOffset _lastClientActivityAt; + private DateTimeOffset? _leaseExpiresAt; + private bool _closeStarted; + + public GatewaySession( + string sessionId, + string backendName, + string pipeName, + string nonce, + string? clientIdentity, + string? clientSessionName, + string? clientCorrelationId, + TimeSpan commandTimeout, + TimeSpan startupTimeout, + TimeSpan shutdownTimeout, + DateTimeOffset openedAt) + { + if (string.IsNullOrWhiteSpace(sessionId)) + { + throw new ArgumentException("Session id is required.", nameof(sessionId)); + } + + if (string.IsNullOrWhiteSpace(backendName)) + { + throw new ArgumentException("Backend name is required.", nameof(backendName)); + } + + if (string.IsNullOrWhiteSpace(pipeName)) + { + throw new ArgumentException("Pipe name is required.", nameof(pipeName)); + } + + if (string.IsNullOrWhiteSpace(nonce)) + { + throw new ArgumentException("Nonce is required.", nameof(nonce)); + } + + SessionId = sessionId; + BackendName = backendName; + PipeName = pipeName; + Nonce = nonce; + ClientIdentity = clientIdentity; + ClientSessionName = clientSessionName; + ClientCorrelationId = clientCorrelationId; + CommandTimeout = commandTimeout; + StartupTimeout = startupTimeout; + ShutdownTimeout = shutdownTimeout; + OpenedAt = openedAt; + _lastClientActivityAt = openedAt; + } + + public string SessionId { get; } + + public string BackendName { get; } + + public string PipeName { get; } + + public string Nonce { get; } + + public string? ClientIdentity { get; } + + public string? ClientSessionName { get; } + + public string? ClientCorrelationId { get; } + + public TimeSpan CommandTimeout { get; } + + public TimeSpan StartupTimeout { get; } + + public TimeSpan ShutdownTimeout { get; } + + public DateTimeOffset OpenedAt { get; } + + public int? WorkerProcessId => _workerClient?.ProcessId; + + public IWorkerClient? WorkerClient => _workerClient; + + public SessionState State + { + get + { + lock (_syncRoot) + { + return _state; + } + } + } + + public DateTimeOffset LastClientActivityAt + { + get + { + lock (_syncRoot) + { + return _lastClientActivityAt; + } + } + } + + public DateTimeOffset? LeaseExpiresAt + { + get + { + lock (_syncRoot) + { + return _leaseExpiresAt; + } + } + } + + public string? FinalFault + { + get + { + lock (_syncRoot) + { + return _finalFault; + } + } + } + + public void AttachWorkerClient(IWorkerClient workerClient) + { + ArgumentNullException.ThrowIfNull(workerClient); + + lock (_syncRoot) + { + _workerClient = workerClient; + } + } + + public void TransitionTo(SessionState nextState) + { + lock (_syncRoot) + { + if (_state is SessionState.Closed) + { + return; + } + + if (_state is SessionState.Faulted && nextState is not SessionState.Closed) + { + return; + } + + _state = nextState; + } + } + + public void MarkReady() + { + TransitionTo(SessionState.Ready); + } + + public void MarkFaulted(string reason) + { + lock (_syncRoot) + { + if (_state is SessionState.Closed) + { + return; + } + + _finalFault = reason; + _state = SessionState.Faulted; + } + } + + public void TouchClientActivity(DateTimeOffset activityAt) + { + lock (_syncRoot) + { + _lastClientActivityAt = activityAt; + } + } + + public void ExtendLease(DateTimeOffset leaseExpiresAt) + { + lock (_syncRoot) + { + _leaseExpiresAt = leaseExpiresAt; + } + } + + public bool IsLeaseExpired(DateTimeOffset now) + { + lock (_syncRoot) + { + return _leaseExpiresAt is not null && _leaseExpiresAt <= now; + } + } + + public async Task InvokeAsync( + WorkerCommand command, + CancellationToken cancellationToken) + { + IWorkerClient workerClient = GetReadyWorkerClient(); + TouchClientActivity(DateTimeOffset.UtcNow); + + return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false); + } + + public IAsyncEnumerable ReadEventsAsync(CancellationToken cancellationToken) + { + IWorkerClient workerClient = GetReadyWorkerClient(); + TouchClientActivity(DateTimeOffset.UtcNow); + + return workerClient.ReadEventsAsync(cancellationToken); + } + + public async Task CloseAsync( + string reason, + CancellationToken cancellationToken) + { + await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + if (_state is SessionState.Closed) + { + return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true); + } + + bool alreadyClosing = _closeStarted; + _closeStarted = true; + _state = SessionState.Closing; + + if (_workerClient is not null) + { + try + { + await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false); + } + catch + { + _workerClient.Kill(reason); + throw; + } + } + + _state = SessionState.Closed; + return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing); + } + finally + { + _closeLock.Release(); + } + } + + public void KillWorker(string reason) + { + _workerClient?.Kill(reason); + TransitionTo(SessionState.Closed); + } + + public async ValueTask DisposeAsync() + { + _closeLock.Dispose(); + if (_workerClient is not null) + { + await _workerClient.DisposeAsync().ConfigureAwait(false); + } + } + + private IWorkerClient GetReadyWorkerClient() + { + lock (_syncRoot) + { + if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready) + { + throw new SessionManagerException( + SessionManagerErrorCode.SessionNotReady, + $"Session {SessionId} is not ready. Current state is {_state}."); + } + + return _workerClient; + } + } +} diff --git a/src/MxGateway.Server/Sessions/ISessionManager.cs b/src/MxGateway.Server/Sessions/ISessionManager.cs new file mode 100644 index 0000000..18de3f4 --- /dev/null +++ b/src/MxGateway.Server/Sessions/ISessionManager.cs @@ -0,0 +1,34 @@ +using MxGateway.Contracts.Proto; + +namespace MxGateway.Server.Sessions; + +public interface ISessionManager +{ + Task OpenSessionAsync( + SessionOpenRequest request, + string? clientIdentity, + CancellationToken cancellationToken); + + bool TryGetSession( + string sessionId, + out GatewaySession session); + + Task InvokeAsync( + string sessionId, + WorkerCommand command, + CancellationToken cancellationToken); + + IAsyncEnumerable ReadEventsAsync( + string sessionId, + CancellationToken cancellationToken); + + Task CloseSessionAsync( + string sessionId, + CancellationToken cancellationToken); + + Task CloseExpiredLeasesAsync( + DateTimeOffset now, + CancellationToken cancellationToken); + + Task ShutdownAsync(CancellationToken cancellationToken); +} diff --git a/src/MxGateway.Server/Sessions/ISessionRegistry.cs b/src/MxGateway.Server/Sessions/ISessionRegistry.cs new file mode 100644 index 0000000..9f66f83 --- /dev/null +++ b/src/MxGateway.Server/Sessions/ISessionRegistry.cs @@ -0,0 +1,16 @@ +namespace MxGateway.Server.Sessions; + +public interface ISessionRegistry +{ + int Count { get; } + + int ActiveCount { get; } + + bool TryAdd(GatewaySession session); + + bool TryGet(string sessionId, out GatewaySession session); + + bool TryRemove(string sessionId, out GatewaySession session); + + IReadOnlyCollection Snapshot(); +} diff --git a/src/MxGateway.Server/Sessions/ISessionWorkerClientFactory.cs b/src/MxGateway.Server/Sessions/ISessionWorkerClientFactory.cs new file mode 100644 index 0000000..8268dbf --- /dev/null +++ b/src/MxGateway.Server/Sessions/ISessionWorkerClientFactory.cs @@ -0,0 +1,8 @@ +namespace MxGateway.Server.Sessions; + +public interface ISessionWorkerClientFactory +{ + Task CreateAsync( + GatewaySession session, + CancellationToken cancellationToken); +} diff --git a/src/MxGateway.Server/Sessions/SessionCloseResult.cs b/src/MxGateway.Server/Sessions/SessionCloseResult.cs new file mode 100644 index 0000000..951c788 --- /dev/null +++ b/src/MxGateway.Server/Sessions/SessionCloseResult.cs @@ -0,0 +1,8 @@ +using MxGateway.Contracts.Proto; + +namespace MxGateway.Server.Sessions; + +public sealed record SessionCloseResult( + string SessionId, + SessionState FinalState, + bool AlreadyClosed); diff --git a/src/MxGateway.Server/Sessions/SessionManager.cs b/src/MxGateway.Server/Sessions/SessionManager.cs new file mode 100644 index 0000000..54272bb --- /dev/null +++ b/src/MxGateway.Server/Sessions/SessionManager.cs @@ -0,0 +1,287 @@ +using System.Security.Cryptography; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using MxGateway.Contracts; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Configuration; +using MxGateway.Server.Metrics; +using MxGateway.Server.Workers; + +namespace MxGateway.Server.Sessions; + +public sealed class SessionManager : ISessionManager +{ + public const string DefaultCloseReason = "client-close"; + public const string GatewayShutdownReason = "gateway-shutdown"; + public const string LeaseExpiredReason = "lease-expired"; + + private readonly ISessionRegistry _registry; + private readonly ISessionWorkerClientFactory _workerClientFactory; + private readonly GatewayMetrics _metrics; + private readonly TimeProvider _timeProvider; + private readonly ILogger _logger; + private readonly GatewayOptions _options; + + public SessionManager( + ISessionRegistry registry, + ISessionWorkerClientFactory workerClientFactory, + IOptions options, + GatewayMetrics metrics, + TimeProvider? timeProvider = null, + ILogger? logger = null) + { + _registry = registry ?? throw new ArgumentNullException(nameof(registry)); + _workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory)); + ArgumentNullException.ThrowIfNull(options); + _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); + _timeProvider = timeProvider ?? TimeProvider.System; + _logger = logger ?? NullLogger.Instance; + _options = options.Value; + } + + public async Task OpenSessionAsync( + SessionOpenRequest request, + string? clientIdentity, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(request); + EnsureSessionCapacity(); + + GatewaySession session = CreateSession(request, clientIdentity); + if (!_registry.TryAdd(session)) + { + throw new SessionManagerException( + SessionManagerErrorCode.OpenFailed, + $"Session id collision while opening session {session.SessionId}."); + } + + try + { + session.TransitionTo(SessionState.StartingWorker); + IWorkerClient workerClient = await _workerClientFactory + .CreateAsync(session, cancellationToken) + .ConfigureAwait(false); + + session.AttachWorkerClient(workerClient); + session.MarkReady(); + _metrics.SessionOpened(); + + return session; + } + catch (Exception exception) + { + session.MarkFaulted(exception.Message); + _registry.TryRemove(session.SessionId, out _); + await session.DisposeAsync().ConfigureAwait(false); + _metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString()); + _logger.LogWarning( + exception, + "Failed to open gateway session {SessionId}.", + session.SessionId); + + throw new SessionManagerException( + SessionManagerErrorCode.OpenFailed, + $"Failed to open session {session.SessionId}.", + exception); + } + } + + public bool TryGetSession( + string sessionId, + out GatewaySession session) + { + return _registry.TryGet(sessionId, out session); + } + + public async Task InvokeAsync( + string sessionId, + WorkerCommand command, + CancellationToken cancellationToken) + { + GatewaySession session = GetRequiredSession(sessionId); + + try + { + return await session.InvokeAsync(command, cancellationToken).ConfigureAwait(false); + } + catch (SessionManagerException) + { + throw; + } + catch (Exception exception) + { + if (session.WorkerClient?.State == WorkerClientState.Faulted) + { + session.MarkFaulted(exception.Message); + } + + throw; + } + } + + public IAsyncEnumerable ReadEventsAsync( + string sessionId, + CancellationToken cancellationToken) + { + GatewaySession session = GetRequiredSession(sessionId); + + return session.ReadEventsAsync(cancellationToken); + } + + public async Task CloseSessionAsync( + string sessionId, + CancellationToken cancellationToken) + { + GatewaySession session = GetRequiredSession(sessionId); + SessionCloseResult result = await CloseSessionCoreAsync( + session, + DefaultCloseReason, + cancellationToken).ConfigureAwait(false); + + return result; + } + + public async Task CloseExpiredLeasesAsync( + DateTimeOffset now, + CancellationToken cancellationToken) + { + int closedCount = 0; + foreach (GatewaySession session in _registry.Snapshot()) + { + if (!session.IsLeaseExpired(now)) + { + continue; + } + + await CloseSessionCoreAsync(session, LeaseExpiredReason, cancellationToken).ConfigureAwait(false); + closedCount++; + } + + return closedCount; + } + + public async Task ShutdownAsync(CancellationToken cancellationToken) + { + foreach (GatewaySession session in _registry.Snapshot()) + { + try + { + await CloseSessionCoreAsync(session, GatewayShutdownReason, cancellationToken).ConfigureAwait(false); + } + catch (Exception exception) + { + _logger.LogWarning( + exception, + "Graceful shutdown failed for session {SessionId}; killing worker.", + session.SessionId); + session.KillWorker(GatewayShutdownReason); + } + } + } + + private async Task CloseSessionCoreAsync( + GatewaySession session, + string reason, + CancellationToken cancellationToken) + { + bool wasClosed = session.State == SessionState.Closed; + try + { + SessionCloseResult result = await session.CloseAsync(reason, cancellationToken).ConfigureAwait(false); + if (!wasClosed && !result.AlreadyClosed) + { + _metrics.SessionClosed(); + } + + return result; + } + catch (Exception exception) + { + session.MarkFaulted(exception.Message); + _metrics.Fault(SessionManagerErrorCode.CloseFailed.ToString()); + throw new SessionManagerException( + SessionManagerErrorCode.CloseFailed, + $"Failed to close session {session.SessionId}.", + exception); + } + } + + private GatewaySession GetRequiredSession(string sessionId) + { + if (!_registry.TryGet(sessionId, out GatewaySession session)) + { + throw new SessionManagerException( + SessionManagerErrorCode.SessionNotFound, + $"Session {sessionId} was not found."); + } + + return session; + } + + private void EnsureSessionCapacity() + { + if (_registry.ActiveCount >= _options.Sessions.MaxSessions) + { + throw new SessionManagerException( + SessionManagerErrorCode.SessionLimitExceeded, + $"Gateway session limit {_options.Sessions.MaxSessions} has been reached."); + } + } + + private GatewaySession CreateSession( + SessionOpenRequest request, + string? clientIdentity) + { + string sessionId = CreateSessionId(); + string backendName = string.IsNullOrWhiteSpace(request.RequestedBackend) + ? GatewayContractInfo.DefaultBackendName + : request.RequestedBackend!; + TimeSpan commandTimeout = ResolveCommandTimeout(request.CommandTimeout); + TimeSpan startupTimeout = TimeSpan.FromSeconds(_options.Worker.StartupTimeoutSeconds); + TimeSpan shutdownTimeout = TimeSpan.FromSeconds(_options.Worker.ShutdownTimeoutSeconds); + string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}"; + string nonce = CreateNonce(); + DateTimeOffset openedAt = _timeProvider.GetUtcNow(); + + return new GatewaySession( + sessionId, + backendName, + pipeName, + nonce, + clientIdentity, + request.ClientSessionName, + request.ClientCorrelationId, + commandTimeout, + startupTimeout, + shutdownTimeout, + openedAt); + } + + private TimeSpan ResolveCommandTimeout(Duration? requestedTimeout) + { + if (requestedTimeout is null) + { + return TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds); + } + + TimeSpan timeout = requestedTimeout.ToTimeSpan(); + return timeout <= TimeSpan.Zero + ? TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds) + : timeout; + } + + private static string CreateSessionId() + { + return $"session-{Guid.NewGuid():N}"; + } + + private static string CreateNonce() + { + Span bytes = stackalloc byte[32]; + RandomNumberGenerator.Fill(bytes); + + return Convert.ToBase64String(bytes); + } +} diff --git a/src/MxGateway.Server/Sessions/SessionManagerErrorCode.cs b/src/MxGateway.Server/Sessions/SessionManagerErrorCode.cs new file mode 100644 index 0000000..dcbca45 --- /dev/null +++ b/src/MxGateway.Server/Sessions/SessionManagerErrorCode.cs @@ -0,0 +1,10 @@ +namespace MxGateway.Server.Sessions; + +public enum SessionManagerErrorCode +{ + SessionNotFound, + SessionNotReady, + SessionLimitExceeded, + OpenFailed, + CloseFailed, +} diff --git a/src/MxGateway.Server/Sessions/SessionManagerException.cs b/src/MxGateway.Server/Sessions/SessionManagerException.cs new file mode 100644 index 0000000..9a87327 --- /dev/null +++ b/src/MxGateway.Server/Sessions/SessionManagerException.cs @@ -0,0 +1,23 @@ +namespace MxGateway.Server.Sessions; + +public sealed class SessionManagerException : Exception +{ + public SessionManagerException( + SessionManagerErrorCode errorCode, + string message) + : base(message) + { + ErrorCode = errorCode; + } + + public SessionManagerException( + SessionManagerErrorCode errorCode, + string message, + Exception innerException) + : base(message, innerException) + { + ErrorCode = errorCode; + } + + public SessionManagerErrorCode ErrorCode { get; } +} diff --git a/src/MxGateway.Server/Sessions/SessionOpenRequest.cs b/src/MxGateway.Server/Sessions/SessionOpenRequest.cs new file mode 100644 index 0000000..8006d98 --- /dev/null +++ b/src/MxGateway.Server/Sessions/SessionOpenRequest.cs @@ -0,0 +1,22 @@ +using Google.Protobuf.WellKnownTypes; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Server.Sessions; + +public sealed record SessionOpenRequest( + string? RequestedBackend, + string? ClientSessionName, + string? ClientCorrelationId, + Duration? CommandTimeout) +{ + public static SessionOpenRequest FromContract(OpenSessionRequest request) + { + ArgumentNullException.ThrowIfNull(request); + + return new SessionOpenRequest( + request.RequestedBackend, + request.ClientSessionName, + request.ClientCorrelationId, + request.CommandTimeout); + } +} diff --git a/src/MxGateway.Server/Sessions/SessionRegistry.cs b/src/MxGateway.Server/Sessions/SessionRegistry.cs new file mode 100644 index 0000000..4c87f08 --- /dev/null +++ b/src/MxGateway.Server/Sessions/SessionRegistry.cs @@ -0,0 +1,39 @@ +using System.Collections.Concurrent; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Server.Sessions; + +public sealed class SessionRegistry : ISessionRegistry +{ + private readonly ConcurrentDictionary _sessions = new(StringComparer.Ordinal); + + public int Count => _sessions.Count; + + public int ActiveCount => _sessions.Values.Count(session => session.State is not SessionState.Closed); + + public bool TryAdd(GatewaySession session) + { + ArgumentNullException.ThrowIfNull(session); + + return _sessions.TryAdd(session.SessionId, session); + } + + public bool TryGet( + string sessionId, + out GatewaySession session) + { + return _sessions.TryGetValue(sessionId, out session!); + } + + public bool TryRemove( + string sessionId, + out GatewaySession session) + { + return _sessions.TryRemove(sessionId, out session!); + } + + public IReadOnlyCollection Snapshot() + { + return _sessions.Values.ToArray(); + } +} diff --git a/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs b/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs new file mode 100644 index 0000000..2510836 --- /dev/null +++ b/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs @@ -0,0 +1,13 @@ +namespace MxGateway.Server.Sessions; + +public static class SessionServiceCollectionExtensions +{ + public static IServiceCollection AddGatewaySessions(this IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + return services; + } +} diff --git a/src/MxGateway.Server/Sessions/SessionWorkerClientFactory.cs b/src/MxGateway.Server/Sessions/SessionWorkerClientFactory.cs new file mode 100644 index 0000000..574ebdc --- /dev/null +++ b/src/MxGateway.Server/Sessions/SessionWorkerClientFactory.cs @@ -0,0 +1,144 @@ +using System.IO.Pipes; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MxGateway.Contracts; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Configuration; +using MxGateway.Server.Metrics; +using MxGateway.Server.Workers; + +namespace MxGateway.Server.Sessions; + +public sealed class SessionWorkerClientFactory : ISessionWorkerClientFactory +{ + private readonly IWorkerProcessLauncher _workerProcessLauncher; + private readonly GatewayMetrics _metrics; + private readonly TimeProvider _timeProvider; + private readonly ILoggerFactory _loggerFactory; + private readonly GatewayOptions _options; + + public SessionWorkerClientFactory( + IWorkerProcessLauncher workerProcessLauncher, + IOptions options, + GatewayMetrics metrics, + ILoggerFactory loggerFactory, + TimeProvider? timeProvider = null) + { + _workerProcessLauncher = workerProcessLauncher ?? throw new ArgumentNullException(nameof(workerProcessLauncher)); + ArgumentNullException.ThrowIfNull(options); + _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); + _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); + _timeProvider = timeProvider ?? TimeProvider.System; + _options = options.Value; + } + + public async Task CreateAsync( + GatewaySession session, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(session); + + NamedPipeServerStream? pipe = CreatePipe(session.PipeName); + WorkerProcessHandle? processHandle = null; + IWorkerClient? workerClient = null; + try + { + session.TransitionTo(SessionState.StartingWorker); + processHandle = await _workerProcessLauncher + .LaunchAsync( + new WorkerProcessLaunchRequest( + session.SessionId, + session.PipeName, + GatewayContractInfo.WorkerProtocolVersion, + session.Nonce, + pipe), + cancellationToken) + .ConfigureAwait(false); + + session.TransitionTo(SessionState.WaitingForPipe); + await WaitForPipeConnectionAsync(pipe, session.StartupTimeout, cancellationToken).ConfigureAwait(false); + + session.TransitionTo(SessionState.Handshaking); + WorkerFrameProtocolOptions frameOptions = new( + session.SessionId, + GatewayContractInfo.WorkerProtocolVersion, + _options.Worker.MaxMessageBytes); + WorkerClientConnection connection = new( + session.SessionId, + session.Nonce, + pipe, + frameOptions, + processHandle); + WorkerClientOptions clientOptions = new() + { + HeartbeatGrace = TimeSpan.FromSeconds(_options.Worker.HeartbeatGraceSeconds), + HeartbeatCheckInterval = TimeSpan.FromSeconds(_options.Worker.HeartbeatIntervalSeconds), + EventChannelCapacity = _options.Events.QueueCapacity, + }; + + workerClient = new WorkerClient( + connection, + clientOptions, + _metrics, + _timeProvider, + _loggerFactory.CreateLogger()); + + pipe = null; + processHandle = null; + + session.TransitionTo(SessionState.InitializingWorker); + await workerClient.StartAsync(cancellationToken).ConfigureAwait(false); + + return workerClient; + } + catch + { + if (workerClient is not null) + { + await workerClient.DisposeAsync().ConfigureAwait(false); + } + else + { + if (processHandle is not null) + { + try + { + if (!processHandle.Process.HasExited) + { + processHandle.Process.Kill(entireProcessTree: true); + _metrics.WorkerKilled("OpenSessionFailed"); + } + } + finally + { + processHandle.Dispose(); + } + } + + pipe?.Dispose(); + } + + throw; + } + } + + private static NamedPipeServerStream CreatePipe(string pipeName) + { + return new NamedPipeServerStream( + pipeName, + PipeDirection.InOut, + maxNumberOfServerInstances: 1, + PipeTransmissionMode.Byte, + PipeOptions.Asynchronous); + } + + private static async Task WaitForPipeConnectionAsync( + NamedPipeServerStream pipe, + TimeSpan startupTimeout, + CancellationToken cancellationToken) + { + using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timeout.CancelAfter(startupTimeout); + await pipe.WaitForConnectionAsync(timeout.Token).ConfigureAwait(false); + } +} diff --git a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs new file mode 100644 index 0000000..9fe3611 --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs @@ -0,0 +1,320 @@ +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Options; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Configuration; +using MxGateway.Server.Metrics; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; + +namespace MxGateway.Tests.Gateway.Sessions; + +public sealed class SessionManagerTests +{ + [Fact] + public async Task OpenSessionAsync_WithWorkerReady_RegistersReadySession() + { + FakeWorkerClient workerClient = new(); + FakeSessionWorkerClientFactory factory = new(workerClient) + { + ApplyLifecycleTransitions = true, + }; + using GatewayMetrics metrics = new(); + SessionManager manager = CreateManager(factory, metrics: metrics); + + GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + + Assert.True(manager.TryGetSession(session.SessionId, out GatewaySession registered)); + Assert.Same(session, registered); + Assert.Equal(SessionState.Ready, session.State); + Assert.Equal("client-1", session.ClientIdentity); + Assert.Equal(["StartingWorker", "WaitingForPipe", "Handshaking", "InitializingWorker"], factory.ObservedStates); + Assert.Equal(1, metrics.GetSnapshot().OpenSessions); + Assert.Equal(1, metrics.GetSnapshot().SessionsOpened); + } + + [Fact] + public async Task InvokeAsync_WhenSessionReady_ForwardsCommandToWorker() + { + FakeWorkerClient workerClient = new(); + SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); + GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + + WorkerCommandReply reply = await manager.InvokeAsync( + session.SessionId, + CreateCommand(MxCommandKind.Ping), + CancellationToken.None); + + Assert.Equal(1, workerClient.InvokeCount); + Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind); + } + + [Fact] + public async Task InvokeAsync_WhenSessionFaulted_RejectsCommand() + { + FakeWorkerClient workerClient = new(); + SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); + GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + session.MarkFaulted("test fault"); + + SessionManagerException exception = await Assert.ThrowsAsync( + async () => await manager.InvokeAsync( + session.SessionId, + CreateCommand(MxCommandKind.Ping), + CancellationToken.None)); + + Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode); + Assert.Equal(0, workerClient.InvokeCount); + } + + [Fact] + public async Task CloseSessionAsync_WhenCalledTwice_IsIdempotent() + { + FakeWorkerClient workerClient = new(); + using GatewayMetrics metrics = new(); + SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient), metrics: metrics); + GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + + SessionCloseResult firstClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None); + SessionCloseResult secondClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None); + + Assert.False(firstClose.AlreadyClosed); + Assert.True(secondClose.AlreadyClosed); + Assert.Equal(SessionState.Closed, firstClose.FinalState); + Assert.Equal(SessionState.Closed, secondClose.FinalState); + Assert.Equal(1, workerClient.ShutdownCount); + Assert.Equal(1, metrics.GetSnapshot().SessionsClosed); + Assert.Equal(0, metrics.GetSnapshot().OpenSessions); + } + + [Fact] + public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry() + { + SessionRegistry registry = new(); + using GatewayMetrics metrics = new(); + SessionManager manager = CreateManager( + new FailingSessionWorkerClientFactory(), + registry, + metrics); + + SessionManagerException exception = await Assert.ThrowsAsync( + async () => await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None)); + + Assert.Equal(SessionManagerErrorCode.OpenFailed, exception.ErrorCode); + Assert.Equal(0, registry.Count); + Assert.Equal(0, metrics.GetSnapshot().SessionsOpened); + Assert.Equal(1, metrics.GetSnapshot().Faults); + } + + [Fact] + public async Task CloseExpiredLeasesAsync_ClosesExpiredSessionsOnly() + { + FakeWorkerClient expiredClient = new(); + FakeWorkerClient activeClient = new(); + QueueingSessionWorkerClientFactory factory = new(expiredClient, activeClient); + SessionManager manager = CreateManager(factory); + GatewaySession expiredSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + GatewaySession activeSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None); + DateTimeOffset now = DateTimeOffset.UtcNow; + expiredSession.ExtendLease(now.AddSeconds(-1)); + activeSession.ExtendLease(now.AddMinutes(5)); + + int closedCount = await manager.CloseExpiredLeasesAsync(now, CancellationToken.None); + + Assert.Equal(1, closedCount); + Assert.Equal(SessionState.Closed, expiredSession.State); + Assert.Equal(SessionState.Ready, activeSession.State); + Assert.Equal(1, expiredClient.ShutdownCount); + Assert.Equal(0, activeClient.ShutdownCount); + } + + [Fact] + public async Task ShutdownAsync_ClosesAllRegisteredSessions() + { + FakeWorkerClient firstClient = new(); + FakeWorkerClient secondClient = new(); + QueueingSessionWorkerClientFactory factory = new(firstClient, secondClient); + using GatewayMetrics metrics = new(); + SessionManager manager = CreateManager(factory, metrics: metrics); + GatewaySession firstSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + GatewaySession secondSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None); + + await manager.ShutdownAsync(CancellationToken.None); + + Assert.Equal(SessionState.Closed, firstSession.State); + Assert.Equal(SessionState.Closed, secondSession.State); + Assert.Equal(1, firstClient.ShutdownCount); + Assert.Equal(1, secondClient.ShutdownCount); + Assert.Equal(2, metrics.GetSnapshot().SessionsClosed); + Assert.Equal(0, metrics.GetSnapshot().OpenSessions); + } + + private static SessionManager CreateManager( + ISessionWorkerClientFactory factory, + ISessionRegistry? registry = null, + GatewayMetrics? metrics = null, + GatewayOptions? options = null) + { + return new SessionManager( + registry ?? new SessionRegistry(), + factory, + Options.Create(options ?? CreateOptions()), + metrics ?? new GatewayMetrics()); + } + + private static GatewayOptions CreateOptions() + { + return new GatewayOptions + { + Sessions = new SessionOptions + { + DefaultCommandTimeoutSeconds = 30, + MaxSessions = 64, + }, + Worker = new WorkerOptions + { + StartupTimeoutSeconds = 30, + ShutdownTimeoutSeconds = 10, + }, + }; + } + + private static SessionOpenRequest CreateOpenRequest() + { + return new SessionOpenRequest( + RequestedBackend: null, + ClientSessionName: "test-session", + ClientCorrelationId: "client-correlation-1", + CommandTimeout: Duration.FromTimeSpan(TimeSpan.FromSeconds(5))); + } + + private static WorkerCommand CreateCommand(MxCommandKind kind) + { + return new WorkerCommand + { + Command = new MxCommand + { + Kind = kind, + }, + }; + } + + private sealed class FakeSessionWorkerClientFactory(IWorkerClient workerClient) : ISessionWorkerClientFactory + { + public List ObservedStates { get; } = []; + + public bool ApplyLifecycleTransitions { get; init; } + + public Task CreateAsync( + GatewaySession session, + CancellationToken cancellationToken) + { + ObservedStates.Add(session.State.ToString()); + if (ApplyLifecycleTransitions) + { + session.TransitionTo(SessionState.WaitingForPipe); + ObservedStates.Add(session.State.ToString()); + session.TransitionTo(SessionState.Handshaking); + ObservedStates.Add(session.State.ToString()); + session.TransitionTo(SessionState.InitializingWorker); + ObservedStates.Add(session.State.ToString()); + } + + return Task.FromResult(workerClient); + } + } + + private sealed class QueueingSessionWorkerClientFactory : ISessionWorkerClientFactory + { + private readonly Queue _workerClients; + + public QueueingSessionWorkerClientFactory(params IWorkerClient[] workerClients) + { + _workerClients = new Queue(workerClients); + } + + public Task CreateAsync( + GatewaySession session, + CancellationToken cancellationToken) + { + return Task.FromResult(_workerClients.Dequeue()); + } + } + + private sealed class FailingSessionWorkerClientFactory : ISessionWorkerClientFactory + { + public Task CreateAsync( + GatewaySession session, + CancellationToken cancellationToken) + { + throw new InvalidOperationException("worker startup failed"); + } + } + + private sealed class FakeWorkerClient : IWorkerClient + { + public string SessionId { get; init; } = "session-1"; + + public int? ProcessId { get; init; } = 1234; + + public WorkerClientState State { get; set; } = WorkerClientState.Ready; + + public DateTimeOffset LastHeartbeatAt { get; init; } = DateTimeOffset.UtcNow; + + public int InvokeCount { get; private set; } + + public int ShutdownCount { get; private set; } + + public int KillCount { get; private set; } + + public Task StartAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public Task InvokeAsync( + WorkerCommand command, + TimeSpan timeout, + CancellationToken cancellationToken) + { + InvokeCount++; + MxCommandKind kind = command.Command?.Kind ?? MxCommandKind.Unspecified; + + return Task.FromResult(new WorkerCommandReply + { + Reply = new MxCommandReply + { + SessionId = SessionId, + CorrelationId = "correlation-1", + Kind = kind, + }, + }); + } + + public async IAsyncEnumerable ReadEventsAsync( + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.CompletedTask; + yield break; + } + + public Task ShutdownAsync( + TimeSpan timeout, + CancellationToken cancellationToken) + { + ShutdownCount++; + State = WorkerClientState.Closed; + return Task.CompletedTask; + } + + public void Kill(string reason) + { + KillCount++; + State = WorkerClientState.Faulted; + } + + public ValueTask DisposeAsync() + { + return ValueTask.CompletedTask; + } + } +} -- 2.52.0