Issue #12: implement session manager and registry

This commit is contained in:
Joseph Doherty
2026-04-26 17:29:36 -04:00
parent cde9c89386
commit d496f1fd75
16 changed files with 1236 additions and 0 deletions
@@ -4,6 +4,7 @@ using MxGateway.Server.Diagnostics;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server;
@@ -31,6 +32,7 @@ public static class GatewayApplication
builder.Services.AddHealthChecks();
builder.Services.AddSingleton<GatewayMetrics>();
builder.Services.AddWorkerProcessLauncher();
builder.Services.AddGatewaySessions();
return builder;
}
@@ -0,0 +1,290 @@
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class GatewaySession
{
private readonly object _syncRoot = new();
private readonly SemaphoreSlim _closeLock = new(1, 1);
private IWorkerClient? _workerClient;
private SessionState _state = SessionState.Creating;
private string? _finalFault;
private DateTimeOffset _lastClientActivityAt;
private DateTimeOffset? _leaseExpiresAt;
private bool _closeStarted;
public GatewaySession(
string sessionId,
string backendName,
string pipeName,
string nonce,
string? clientIdentity,
string? clientSessionName,
string? clientCorrelationId,
TimeSpan commandTimeout,
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
DateTimeOffset openedAt)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new ArgumentException("Session id is required.", nameof(sessionId));
}
if (string.IsNullOrWhiteSpace(backendName))
{
throw new ArgumentException("Backend name is required.", nameof(backendName));
}
if (string.IsNullOrWhiteSpace(pipeName))
{
throw new ArgumentException("Pipe name is required.", nameof(pipeName));
}
if (string.IsNullOrWhiteSpace(nonce))
{
throw new ArgumentException("Nonce is required.", nameof(nonce));
}
SessionId = sessionId;
BackendName = backendName;
PipeName = pipeName;
Nonce = nonce;
ClientIdentity = clientIdentity;
ClientSessionName = clientSessionName;
ClientCorrelationId = clientCorrelationId;
CommandTimeout = commandTimeout;
StartupTimeout = startupTimeout;
ShutdownTimeout = shutdownTimeout;
OpenedAt = openedAt;
_lastClientActivityAt = openedAt;
}
public string SessionId { get; }
public string BackendName { get; }
public string PipeName { get; }
public string Nonce { get; }
public string? ClientIdentity { get; }
public string? ClientSessionName { get; }
public string? ClientCorrelationId { get; }
public TimeSpan CommandTimeout { get; }
public TimeSpan StartupTimeout { get; }
public TimeSpan ShutdownTimeout { get; }
public DateTimeOffset OpenedAt { get; }
public int? WorkerProcessId => _workerClient?.ProcessId;
public IWorkerClient? WorkerClient => _workerClient;
public SessionState State
{
get
{
lock (_syncRoot)
{
return _state;
}
}
}
public DateTimeOffset LastClientActivityAt
{
get
{
lock (_syncRoot)
{
return _lastClientActivityAt;
}
}
}
public DateTimeOffset? LeaseExpiresAt
{
get
{
lock (_syncRoot)
{
return _leaseExpiresAt;
}
}
}
public string? FinalFault
{
get
{
lock (_syncRoot)
{
return _finalFault;
}
}
}
public void AttachWorkerClient(IWorkerClient workerClient)
{
ArgumentNullException.ThrowIfNull(workerClient);
lock (_syncRoot)
{
_workerClient = workerClient;
}
}
public void TransitionTo(SessionState nextState)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
if (_state is SessionState.Faulted && nextState is not SessionState.Closed)
{
return;
}
_state = nextState;
}
}
public void MarkReady()
{
TransitionTo(SessionState.Ready);
}
public void MarkFaulted(string reason)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
_finalFault = reason;
_state = SessionState.Faulted;
}
}
public void TouchClientActivity(DateTimeOffset activityAt)
{
lock (_syncRoot)
{
_lastClientActivityAt = activityAt;
}
}
public void ExtendLease(DateTimeOffset leaseExpiresAt)
{
lock (_syncRoot)
{
_leaseExpiresAt = leaseExpiresAt;
}
}
public bool IsLeaseExpired(DateTimeOffset now)
{
lock (_syncRoot)
{
return _leaseExpiresAt is not null && _leaseExpiresAt <= now;
}
}
public async Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return workerClient.ReadEventsAsync(cancellationToken);
}
public async Task<SessionCloseResult> CloseAsync(
string reason,
CancellationToken cancellationToken)
{
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_state is SessionState.Closed)
{
return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true);
}
bool alreadyClosing = _closeStarted;
_closeStarted = true;
_state = SessionState.Closing;
if (_workerClient is not null)
{
try
{
await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false);
}
catch
{
_workerClient.Kill(reason);
throw;
}
}
_state = SessionState.Closed;
return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing);
}
finally
{
_closeLock.Release();
}
}
public void KillWorker(string reason)
{
_workerClient?.Kill(reason);
TransitionTo(SessionState.Closed);
}
public async ValueTask DisposeAsync()
{
_closeLock.Dispose();
if (_workerClient is not null)
{
await _workerClient.DisposeAsync().ConfigureAwait(false);
}
}
private IWorkerClient GetReadyWorkerClient()
{
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready. Current state is {_state}.");
}
return _workerClient;
}
}
}
@@ -0,0 +1,34 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public interface ISessionManager
{
Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken);
bool TryGetSession(
string sessionId,
out GatewaySession session);
Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken);
IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken);
Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken);
Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken);
Task ShutdownAsync(CancellationToken cancellationToken);
}
@@ -0,0 +1,16 @@
namespace MxGateway.Server.Sessions;
public interface ISessionRegistry
{
int Count { get; }
int ActiveCount { get; }
bool TryAdd(GatewaySession session);
bool TryGet(string sessionId, out GatewaySession session);
bool TryRemove(string sessionId, out GatewaySession session);
IReadOnlyCollection<GatewaySession> Snapshot();
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Sessions;
public interface ISessionWorkerClientFactory
{
Task<MxGateway.Server.Workers.IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken);
}
@@ -0,0 +1,8 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed record SessionCloseResult(
string SessionId,
SessionState FinalState,
bool AlreadyClosed);
@@ -0,0 +1,287 @@
using System.Security.Cryptography;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class SessionManager : ISessionManager
{
public const string DefaultCloseReason = "client-close";
public const string GatewayShutdownReason = "gateway-shutdown";
public const string LeaseExpiredReason = "lease-expired";
private readonly ISessionRegistry _registry;
private readonly ISessionWorkerClientFactory _workerClientFactory;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILogger<SessionManager> _logger;
private readonly GatewayOptions _options;
public SessionManager(
ISessionRegistry registry,
ISessionWorkerClientFactory workerClientFactory,
IOptions<GatewayOptions> options,
GatewayMetrics metrics,
TimeProvider? timeProvider = null,
ILogger<SessionManager>? logger = null)
{
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
_workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory));
ArgumentNullException.ThrowIfNull(options);
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? NullLogger<SessionManager>.Instance;
_options = options.Value;
}
public async Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(request);
EnsureSessionCapacity();
GatewaySession session = CreateSession(request, clientIdentity);
if (!_registry.TryAdd(session))
{
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Session id collision while opening session {session.SessionId}.");
}
try
{
session.TransitionTo(SessionState.StartingWorker);
IWorkerClient workerClient = await _workerClientFactory
.CreateAsync(session, cancellationToken)
.ConfigureAwait(false);
session.AttachWorkerClient(workerClient);
session.MarkReady();
_metrics.SessionOpened();
return session;
}
catch (Exception exception)
{
session.MarkFaulted(exception.Message);
_registry.TryRemove(session.SessionId, out _);
await session.DisposeAsync().ConfigureAwait(false);
_metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString());
_logger.LogWarning(
exception,
"Failed to open gateway session {SessionId}.",
session.SessionId);
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Failed to open session {session.SessionId}.",
exception);
}
}
public bool TryGetSession(
string sessionId,
out GatewaySession session)
{
return _registry.TryGet(sessionId, out session);
}
public async Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
try
{
return await session.InvokeAsync(command, cancellationToken).ConfigureAwait(false);
}
catch (SessionManagerException)
{
throw;
}
catch (Exception exception)
{
if (session.WorkerClient?.State == WorkerClientState.Faulted)
{
session.MarkFaulted(exception.Message);
}
throw;
}
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
return session.ReadEventsAsync(cancellationToken);
}
public async Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
SessionCloseResult result = await CloseSessionCoreAsync(
session,
DefaultCloseReason,
cancellationToken).ConfigureAwait(false);
return result;
}
public async Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken)
{
int closedCount = 0;
foreach (GatewaySession session in _registry.Snapshot())
{
if (!session.IsLeaseExpired(now))
{
continue;
}
await CloseSessionCoreAsync(session, LeaseExpiredReason, cancellationToken).ConfigureAwait(false);
closedCount++;
}
return closedCount;
}
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
foreach (GatewaySession session in _registry.Snapshot())
{
try
{
await CloseSessionCoreAsync(session, GatewayShutdownReason, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.LogWarning(
exception,
"Graceful shutdown failed for session {SessionId}; killing worker.",
session.SessionId);
session.KillWorker(GatewayShutdownReason);
}
}
}
private async Task<SessionCloseResult> CloseSessionCoreAsync(
GatewaySession session,
string reason,
CancellationToken cancellationToken)
{
bool wasClosed = session.State == SessionState.Closed;
try
{
SessionCloseResult result = await session.CloseAsync(reason, cancellationToken).ConfigureAwait(false);
if (!wasClosed && !result.AlreadyClosed)
{
_metrics.SessionClosed();
}
return result;
}
catch (Exception exception)
{
session.MarkFaulted(exception.Message);
_metrics.Fault(SessionManagerErrorCode.CloseFailed.ToString());
throw new SessionManagerException(
SessionManagerErrorCode.CloseFailed,
$"Failed to close session {session.SessionId}.",
exception);
}
}
private GatewaySession GetRequiredSession(string sessionId)
{
if (!_registry.TryGet(sessionId, out GatewaySession session))
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotFound,
$"Session {sessionId} was not found.");
}
return session;
}
private void EnsureSessionCapacity()
{
if (_registry.ActiveCount >= _options.Sessions.MaxSessions)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionLimitExceeded,
$"Gateway session limit {_options.Sessions.MaxSessions} has been reached.");
}
}
private GatewaySession CreateSession(
SessionOpenRequest request,
string? clientIdentity)
{
string sessionId = CreateSessionId();
string backendName = string.IsNullOrWhiteSpace(request.RequestedBackend)
? GatewayContractInfo.DefaultBackendName
: request.RequestedBackend!;
TimeSpan commandTimeout = ResolveCommandTimeout(request.CommandTimeout);
TimeSpan startupTimeout = TimeSpan.FromSeconds(_options.Worker.StartupTimeoutSeconds);
TimeSpan shutdownTimeout = TimeSpan.FromSeconds(_options.Worker.ShutdownTimeoutSeconds);
string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}";
string nonce = CreateNonce();
DateTimeOffset openedAt = _timeProvider.GetUtcNow();
return new GatewaySession(
sessionId,
backendName,
pipeName,
nonce,
clientIdentity,
request.ClientSessionName,
request.ClientCorrelationId,
commandTimeout,
startupTimeout,
shutdownTimeout,
openedAt);
}
private TimeSpan ResolveCommandTimeout(Duration? requestedTimeout)
{
if (requestedTimeout is null)
{
return TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds);
}
TimeSpan timeout = requestedTimeout.ToTimeSpan();
return timeout <= TimeSpan.Zero
? TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds)
: timeout;
}
private static string CreateSessionId()
{
return $"session-{Guid.NewGuid():N}";
}
private static string CreateNonce()
{
Span<byte> bytes = stackalloc byte[32];
RandomNumberGenerator.Fill(bytes);
return Convert.ToBase64String(bytes);
}
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Sessions;
public enum SessionManagerErrorCode
{
SessionNotFound,
SessionNotReady,
SessionLimitExceeded,
OpenFailed,
CloseFailed,
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Sessions;
public sealed class SessionManagerException : Exception
{
public SessionManagerException(
SessionManagerErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public SessionManagerException(
SessionManagerErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public SessionManagerErrorCode ErrorCode { get; }
}
@@ -0,0 +1,22 @@
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed record SessionOpenRequest(
string? RequestedBackend,
string? ClientSessionName,
string? ClientCorrelationId,
Duration? CommandTimeout)
{
public static SessionOpenRequest FromContract(OpenSessionRequest request)
{
ArgumentNullException.ThrowIfNull(request);
return new SessionOpenRequest(
request.RequestedBackend,
request.ClientSessionName,
request.ClientCorrelationId,
request.CommandTimeout);
}
}
@@ -0,0 +1,39 @@
using System.Collections.Concurrent;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed class SessionRegistry : ISessionRegistry
{
private readonly ConcurrentDictionary<string, GatewaySession> _sessions = new(StringComparer.Ordinal);
public int Count => _sessions.Count;
public int ActiveCount => _sessions.Values.Count(session => session.State is not SessionState.Closed);
public bool TryAdd(GatewaySession session)
{
ArgumentNullException.ThrowIfNull(session);
return _sessions.TryAdd(session.SessionId, session);
}
public bool TryGet(
string sessionId,
out GatewaySession session)
{
return _sessions.TryGetValue(sessionId, out session!);
}
public bool TryRemove(
string sessionId,
out GatewaySession session)
{
return _sessions.TryRemove(sessionId, out session!);
}
public IReadOnlyCollection<GatewaySession> Snapshot()
{
return _sessions.Values.ToArray();
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Sessions;
public static class SessionServiceCollectionExtensions
{
public static IServiceCollection AddGatewaySessions(this IServiceCollection services)
{
services.AddSingleton<ISessionRegistry, SessionRegistry>();
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
services.AddSingleton<ISessionManager, SessionManager>();
return services;
}
}
@@ -0,0 +1,144 @@
using System.IO.Pipes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class SessionWorkerClientFactory : ISessionWorkerClientFactory
{
private readonly IWorkerProcessLauncher _workerProcessLauncher;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILoggerFactory _loggerFactory;
private readonly GatewayOptions _options;
public SessionWorkerClientFactory(
IWorkerProcessLauncher workerProcessLauncher,
IOptions<GatewayOptions> options,
GatewayMetrics metrics,
ILoggerFactory loggerFactory,
TimeProvider? timeProvider = null)
{
_workerProcessLauncher = workerProcessLauncher ?? throw new ArgumentNullException(nameof(workerProcessLauncher));
ArgumentNullException.ThrowIfNull(options);
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_timeProvider = timeProvider ?? TimeProvider.System;
_options = options.Value;
}
public async Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(session);
NamedPipeServerStream? pipe = CreatePipe(session.PipeName);
WorkerProcessHandle? processHandle = null;
IWorkerClient? workerClient = null;
try
{
session.TransitionTo(SessionState.StartingWorker);
processHandle = await _workerProcessLauncher
.LaunchAsync(
new WorkerProcessLaunchRequest(
session.SessionId,
session.PipeName,
GatewayContractInfo.WorkerProtocolVersion,
session.Nonce,
pipe),
cancellationToken)
.ConfigureAwait(false);
session.TransitionTo(SessionState.WaitingForPipe);
await WaitForPipeConnectionAsync(pipe, session.StartupTimeout, cancellationToken).ConfigureAwait(false);
session.TransitionTo(SessionState.Handshaking);
WorkerFrameProtocolOptions frameOptions = new(
session.SessionId,
GatewayContractInfo.WorkerProtocolVersion,
_options.Worker.MaxMessageBytes);
WorkerClientConnection connection = new(
session.SessionId,
session.Nonce,
pipe,
frameOptions,
processHandle);
WorkerClientOptions clientOptions = new()
{
HeartbeatGrace = TimeSpan.FromSeconds(_options.Worker.HeartbeatGraceSeconds),
HeartbeatCheckInterval = TimeSpan.FromSeconds(_options.Worker.HeartbeatIntervalSeconds),
EventChannelCapacity = _options.Events.QueueCapacity,
};
workerClient = new WorkerClient(
connection,
clientOptions,
_metrics,
_timeProvider,
_loggerFactory.CreateLogger<WorkerClient>());
pipe = null;
processHandle = null;
session.TransitionTo(SessionState.InitializingWorker);
await workerClient.StartAsync(cancellationToken).ConfigureAwait(false);
return workerClient;
}
catch
{
if (workerClient is not null)
{
await workerClient.DisposeAsync().ConfigureAwait(false);
}
else
{
if (processHandle is not null)
{
try
{
if (!processHandle.Process.HasExited)
{
processHandle.Process.Kill(entireProcessTree: true);
_metrics.WorkerKilled("OpenSessionFailed");
}
}
finally
{
processHandle.Dispose();
}
}
pipe?.Dispose();
}
throw;
}
}
private static NamedPipeServerStream CreatePipe(string pipeName)
{
return new NamedPipeServerStream(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
}
private static async Task WaitForPipeConnectionAsync(
NamedPipeServerStream pipe,
TimeSpan startupTimeout,
CancellationToken cancellationToken)
{
using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeout.CancelAfter(startupTimeout);
await pipe.WaitForConnectionAsync(timeout.Token).ConfigureAwait(false);
}
}