using System.Security.Cryptography; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Server.Configuration; using MxGateway.Server.Metrics; using MxGateway.Server.Workers; namespace MxGateway.Server.Sessions; public sealed class SessionManager : ISessionManager { public const string DefaultCloseReason = "client-close"; public const string GatewayShutdownReason = "gateway-shutdown"; public const string LeaseExpiredReason = "lease-expired"; private readonly ISessionRegistry _registry; private readonly ISessionWorkerClientFactory _workerClientFactory; private readonly GatewayMetrics _metrics; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; private readonly GatewayOptions _options; public SessionManager( ISessionRegistry registry, ISessionWorkerClientFactory workerClientFactory, IOptions options, GatewayMetrics metrics, TimeProvider? timeProvider = null, ILogger? logger = null) { _registry = registry ?? throw new ArgumentNullException(nameof(registry)); _workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory)); ArgumentNullException.ThrowIfNull(options); _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); _timeProvider = timeProvider ?? TimeProvider.System; _logger = logger ?? NullLogger.Instance; _options = options.Value; } public async Task OpenSessionAsync( SessionOpenRequest request, string? clientIdentity, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(request); EnsureSessionCapacity(); GatewaySession session = CreateSession(request, clientIdentity); if (!_registry.TryAdd(session)) { throw new SessionManagerException( SessionManagerErrorCode.OpenFailed, $"Session id collision while opening session {session.SessionId}."); } try { session.TransitionTo(SessionState.StartingWorker); IWorkerClient workerClient = await _workerClientFactory .CreateAsync(session, cancellationToken) .ConfigureAwait(false); session.AttachWorkerClient(workerClient); session.MarkReady(); _metrics.SessionOpened(); return session; } catch (Exception exception) { session.MarkFaulted(exception.Message); _registry.TryRemove(session.SessionId, out _); await session.DisposeAsync().ConfigureAwait(false); _metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString()); _logger.LogWarning( exception, "Failed to open gateway session {SessionId}.", session.SessionId); throw new SessionManagerException( SessionManagerErrorCode.OpenFailed, $"Failed to open session {session.SessionId}.", exception); } } public bool TryGetSession( string sessionId, out GatewaySession session) { return _registry.TryGet(sessionId, out session); } public async Task InvokeAsync( string sessionId, WorkerCommand command, CancellationToken cancellationToken) { GatewaySession session = GetRequiredSession(sessionId); try { return await session.InvokeAsync(command, cancellationToken).ConfigureAwait(false); } catch (SessionManagerException) { throw; } catch (Exception exception) { if (session.WorkerClient?.State == WorkerClientState.Faulted) { session.MarkFaulted(exception.Message); } throw; } } public IAsyncEnumerable ReadEventsAsync( string sessionId, CancellationToken cancellationToken) { GatewaySession session = GetRequiredSession(sessionId); return session.ReadEventsAsync(cancellationToken); } public async Task CloseSessionAsync( string sessionId, CancellationToken cancellationToken) { GatewaySession session = GetRequiredSession(sessionId); SessionCloseResult result = await CloseSessionCoreAsync( session, DefaultCloseReason, cancellationToken).ConfigureAwait(false); return result; } public async Task CloseExpiredLeasesAsync( DateTimeOffset now, CancellationToken cancellationToken) { int closedCount = 0; foreach (GatewaySession session in _registry.Snapshot()) { if (!session.IsLeaseExpired(now)) { continue; } await CloseSessionCoreAsync(session, LeaseExpiredReason, cancellationToken).ConfigureAwait(false); closedCount++; } return closedCount; } public async Task ShutdownAsync(CancellationToken cancellationToken) { foreach (GatewaySession session in _registry.Snapshot()) { try { await CloseSessionCoreAsync(session, GatewayShutdownReason, cancellationToken).ConfigureAwait(false); } catch (Exception exception) { _logger.LogWarning( exception, "Graceful shutdown failed for session {SessionId}; killing worker.", session.SessionId); session.KillWorker(GatewayShutdownReason); } } } private async Task CloseSessionCoreAsync( GatewaySession session, string reason, CancellationToken cancellationToken) { bool wasClosed = session.State == SessionState.Closed; try { SessionCloseResult result = await session.CloseAsync(reason, cancellationToken).ConfigureAwait(false); if (!wasClosed && !result.AlreadyClosed) { _metrics.SessionClosed(); } return result; } catch (Exception exception) { session.MarkFaulted(exception.Message); _metrics.Fault(SessionManagerErrorCode.CloseFailed.ToString()); throw new SessionManagerException( SessionManagerErrorCode.CloseFailed, $"Failed to close session {session.SessionId}.", exception); } } private GatewaySession GetRequiredSession(string sessionId) { if (!_registry.TryGet(sessionId, out GatewaySession session)) { throw new SessionManagerException( SessionManagerErrorCode.SessionNotFound, $"Session {sessionId} was not found."); } return session; } private void EnsureSessionCapacity() { if (_registry.ActiveCount >= _options.Sessions.MaxSessions) { throw new SessionManagerException( SessionManagerErrorCode.SessionLimitExceeded, $"Gateway session limit {_options.Sessions.MaxSessions} has been reached."); } } private GatewaySession CreateSession( SessionOpenRequest request, string? clientIdentity) { string sessionId = CreateSessionId(); string backendName = string.IsNullOrWhiteSpace(request.RequestedBackend) ? GatewayContractInfo.DefaultBackendName : request.RequestedBackend!; TimeSpan commandTimeout = ResolveCommandTimeout(request.CommandTimeout); TimeSpan startupTimeout = TimeSpan.FromSeconds(_options.Worker.StartupTimeoutSeconds); TimeSpan shutdownTimeout = TimeSpan.FromSeconds(_options.Worker.ShutdownTimeoutSeconds); string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}"; string nonce = CreateNonce(); DateTimeOffset openedAt = _timeProvider.GetUtcNow(); return new GatewaySession( sessionId, backendName, pipeName, nonce, clientIdentity, request.ClientSessionName, request.ClientCorrelationId, commandTimeout, startupTimeout, shutdownTimeout, openedAt); } private TimeSpan ResolveCommandTimeout(Duration? requestedTimeout) { if (requestedTimeout is null) { return TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds); } TimeSpan timeout = requestedTimeout.ToTimeSpan(); return timeout <= TimeSpan.Zero ? TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds) : timeout; } private static string CreateSessionId() { return $"session-{Guid.NewGuid():N}"; } private static string CreateNonce() { Span bytes = stackalloc byte[32]; RandomNumberGenerator.Fill(bytes); return Convert.ToBase64String(bytes); } }