using System.Security.Cryptography; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Server.Configuration; using MxGateway.Server.Metrics; using MxGateway.Server.Workers; namespace MxGateway.Server.Sessions; public sealed class SessionManager : ISessionManager { public const string DefaultCloseReason = "client-close"; public const string GatewayShutdownReason = "gateway-shutdown"; public const string LeaseExpiredReason = "lease-expired"; private readonly ISessionRegistry _registry; private readonly ISessionWorkerClientFactory _workerClientFactory; private readonly GatewayMetrics _metrics; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; private readonly GatewayOptions _options; private readonly SemaphoreSlim _sessionSlots; /// /// Initializes a new instance of . /// /// Session registry. /// Worker client factory. /// Gateway options. /// Gateway metrics. /// Time provider for timestamps. /// Logger. public SessionManager( ISessionRegistry registry, ISessionWorkerClientFactory workerClientFactory, IOptions options, GatewayMetrics metrics, TimeProvider? timeProvider = null, ILogger? logger = null) { _registry = registry ?? throw new ArgumentNullException(nameof(registry)); _workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory)); ArgumentNullException.ThrowIfNull(options); _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); _timeProvider = timeProvider ?? TimeProvider.System; _logger = logger ?? NullLogger.Instance; _options = options.Value; _sessionSlots = new SemaphoreSlim(_options.Sessions.MaxSessions, _options.Sessions.MaxSessions); } /// /// Opens a new gateway session and connects to the worker. /// /// Session open request. /// Client authentication identity. /// Cancellation token. /// Opened gateway session. public async Task OpenSessionAsync( SessionOpenRequest request, string? clientIdentity, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(request); EnsureSessionCapacity(); GatewaySession? session = null; bool sessionOpenedRecorded = false; try { session = CreateSession(request, clientIdentity); if (!_registry.TryAdd(session)) { throw new SessionManagerException( SessionManagerErrorCode.OpenFailed, $"Session id collision while opening session {session.SessionId}."); } session.TransitionTo(SessionState.StartingWorker); IWorkerClient workerClient = await _workerClientFactory .CreateAsync(session, cancellationToken) .ConfigureAwait(false); session.AttachWorkerClient(workerClient); session.MarkReady(); _metrics.SessionOpened(); sessionOpenedRecorded = true; await TryAutoSubscribeAlarmsAsync(session, cancellationToken).ConfigureAwait(false); return session; } catch (Exception exception) { session?.MarkFaulted(exception.Message); if (session is not null) { _registry.TryRemove(session.SessionId, out _); await session.DisposeAsync().ConfigureAwait(false); } // If SessionOpened() already incremented the open-session gauge, // a failure after that point (e.g. auto-subscribe rejection) must // decrement it again so mxgateway.sessions.open does not leak. if (sessionOpenedRecorded) { _metrics.SessionRemoved(); } ReleaseSessionSlot(); _metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString()); _logger.LogWarning( exception, "Failed to open gateway session {SessionId}.", session?.SessionId ?? ""); throw new SessionManagerException( SessionManagerErrorCode.OpenFailed, session is null ? "Failed to create session." : $"Failed to open session {session.SessionId}.", exception); } } /// /// Attempts to retrieve a session by ID. /// /// Session identifier. /// The session if found. /// True if session found; otherwise false. public bool TryGetSession( string sessionId, out GatewaySession session) { return _registry.TryGet(sessionId, out session); } /// /// Invokes a worker command on a session asynchronously. /// /// Session identifier. /// Worker command. /// Cancellation token. /// Command reply. public async Task InvokeAsync( string sessionId, WorkerCommand command, CancellationToken cancellationToken) { GatewaySession session = GetRequiredSession(sessionId); try { return await session.InvokeAsync(command, cancellationToken).ConfigureAwait(false); } catch (SessionManagerException) { throw; } catch (Exception exception) { if (session.WorkerClient?.State == WorkerClientState.Faulted) { session.MarkFaulted(exception.Message); } throw; } } /// /// Reads events from a session's event stream asynchronously. /// /// Session identifier. /// Cancellation token. /// Async enumerable of worker events. public IAsyncEnumerable ReadEventsAsync( string sessionId, CancellationToken cancellationToken) { GatewaySession session = GetRequiredSession(sessionId); return session.ReadEventsAsync(cancellationToken); } /// /// Closes a gateway session asynchronously. /// /// Session identifier. /// Cancellation token. /// Session close result. public async Task CloseSessionAsync( string sessionId, CancellationToken cancellationToken) { GatewaySession session = GetRequiredSession(sessionId); SessionCloseResult result = await CloseSessionCoreAsync( session, DefaultCloseReason, cancellationToken).ConfigureAwait(false); return result; } /// /// Closes all sessions with expired leases asynchronously. /// /// Current time for lease expiration check. /// Cancellation token. /// Count of sessions closed. public async Task CloseExpiredLeasesAsync( DateTimeOffset now, CancellationToken cancellationToken) { int closedCount = 0; foreach (GatewaySession session in _registry.Snapshot()) { if (!session.IsLeaseExpired(now)) { continue; } await CloseSessionCoreAsync(session, LeaseExpiredReason, cancellationToken).ConfigureAwait(false); closedCount++; } return closedCount; } /// /// Shuts down all active sessions gracefully asynchronously. /// /// Cancellation token. /// Completed task. public async Task ShutdownAsync(CancellationToken cancellationToken) { foreach (GatewaySession session in _registry.Snapshot()) { try { await CloseSessionCoreAsync(session, GatewayShutdownReason, cancellationToken).ConfigureAwait(false); } catch (Exception exception) { _logger.LogWarning( exception, "Graceful shutdown failed for session {SessionId}; killing worker.", session.SessionId); if (_registry.TryGet(session.SessionId, out _)) { session.KillWorker(GatewayShutdownReason); await RemoveSessionAsync(session).ConfigureAwait(false); } } } } private async Task CloseSessionCoreAsync( GatewaySession session, string reason, CancellationToken cancellationToken) { bool wasClosed = session.State == SessionState.Closed; try { SessionCloseResult result = await session.CloseAsync(reason, cancellationToken).ConfigureAwait(false); if (!wasClosed && !result.AlreadyClosed) { _metrics.SessionClosed(); } await RemoveSessionAsync(session).ConfigureAwait(false); return result; } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { throw; } catch (SessionCloseStartedException exception) { session.MarkFaulted(exception.Message); if (!wasClosed) { _metrics.SessionRemoved(); } _metrics.Fault(SessionManagerErrorCode.CloseFailed.ToString()); await RemoveSessionAsync(session).ConfigureAwait(false); throw new SessionManagerException( SessionManagerErrorCode.CloseFailed, $"Failed to close session {session.SessionId}.", exception); } } private GatewaySession GetRequiredSession(string sessionId) { if (!_registry.TryGet(sessionId, out GatewaySession session)) { throw new SessionManagerException( SessionManagerErrorCode.SessionNotFound, $"Session {sessionId} was not found."); } return session; } private void EnsureSessionCapacity() { if (!_sessionSlots.Wait(0)) { throw new SessionManagerException( SessionManagerErrorCode.SessionLimitExceeded, $"Gateway session limit {_options.Sessions.MaxSessions} has been reached."); } } private async Task RemoveSessionAsync(GatewaySession session) { if (!_registry.TryRemove(session.SessionId, out GatewaySession? removedSession)) { return; } _metrics.RemoveSessionEvents(session.SessionId); ReleaseSessionSlot(); await removedSession.DisposeAsync().ConfigureAwait(false); } private void ReleaseSessionSlot() { try { _sessionSlots.Release(); } catch (SemaphoreFullException) { } } private GatewaySession CreateSession( SessionOpenRequest request, string? clientIdentity) { string sessionId = CreateSessionId(); string backendName = string.IsNullOrWhiteSpace(request.RequestedBackend) ? GatewayContractInfo.DefaultBackendName : request.RequestedBackend!; TimeSpan commandTimeout = ResolveCommandTimeout(request.CommandTimeout); TimeSpan startupTimeout = TimeSpan.FromSeconds(_options.Worker.StartupTimeoutSeconds); TimeSpan shutdownTimeout = TimeSpan.FromSeconds(_options.Worker.ShutdownTimeoutSeconds); TimeSpan leaseDuration = TimeSpan.FromSeconds(_options.Sessions.DefaultLeaseSeconds); string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}"; string nonce = CreateNonce(); DateTimeOffset openedAt = _timeProvider.GetUtcNow(); string clientCorrelationId = CreateClientCorrelationId(request.ClientSessionName, sessionId); return new GatewaySession( sessionId, backendName, pipeName, nonce, clientIdentity, request.ClientSessionName, clientCorrelationId, commandTimeout, startupTimeout, shutdownTimeout, leaseDuration, openedAt); } private static string CreateClientCorrelationId( string? clientSessionName, string sessionId) { string clientName = string.IsNullOrWhiteSpace(clientSessionName) ? "client" : clientSessionName!; return $"{clientName}-{sessionId}"; } private TimeSpan ResolveCommandTimeout(Duration? requestedTimeout) { if (requestedTimeout is null) { return TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds); } TimeSpan timeout = requestedTimeout.ToTimeSpan(); return timeout <= TimeSpan.Zero ? TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds) : timeout; } private static string CreateSessionId() { return $"session-{Guid.NewGuid():N}"; } private static string CreateNonce() { Span bytes = stackalloc byte[32]; RandomNumberGenerator.Fill(bytes); return Convert.ToBase64String(bytes); } /// /// If Alarms.Enabled is configured, issue a /// SubscribeAlarmsCommand on the freshly-Ready session so the /// worker's wnwrap consumer starts polling. Failure handling is /// governed by Alarms.RequireSubscribeOnOpen: /// /// true — propagate the failure to fault the session. /// false (default) — log a warning and let the session continue serving data subscriptions. /// /// private async Task TryAutoSubscribeAlarmsAsync( GatewaySession session, CancellationToken cancellationToken) { AlarmsOptions alarms = _options.Alarms; if (!alarms.Enabled) return; string subscription = ResolveAlarmSubscription(alarms); if (string.IsNullOrWhiteSpace(subscription)) { const string diagnostic = "Alarms.Enabled is true but no SubscriptionExpression / DefaultArea is configured."; if (alarms.RequireSubscribeOnOpen) { throw new SessionManagerException( SessionManagerErrorCode.OpenFailed, diagnostic); } _logger.LogWarning( "Auto-subscribe skipped for session {SessionId}: {Diagnostic}", session.SessionId, diagnostic); return; } WorkerCommand command = new WorkerCommand { Command = new MxCommand { Kind = MxCommandKind.SubscribeAlarms, SubscribeAlarms = new SubscribeAlarmsCommand { SubscriptionExpression = subscription, }, }, EnqueueTimestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()), }; try { WorkerCommandReply reply = await session.InvokeAsync(command, cancellationToken) .ConfigureAwait(false); ProtocolStatusCode? code = reply.Reply?.ProtocolStatus?.Code; if (code != ProtocolStatusCode.Ok) { string diagnostic = reply.Reply?.DiagnosticMessage ?? reply.Reply?.ProtocolStatus?.Message ?? "Worker rejected SubscribeAlarms."; if (alarms.RequireSubscribeOnOpen) { throw new SessionManagerException( SessionManagerErrorCode.OpenFailed, $"Auto-subscribe failed for session {session.SessionId}: {diagnostic}"); } _logger.LogWarning( "Auto-subscribe failed for session {SessionId} (status {StatusCode}): {Diagnostic}", session.SessionId, code, diagnostic); return; } _logger.LogInformation( "Alarm auto-subscribe succeeded for session {SessionId} on {Subscription}.", session.SessionId, subscription); } catch (SessionManagerException) { throw; } catch (Exception ex) when (!alarms.RequireSubscribeOnOpen) { _logger.LogWarning( ex, "Auto-subscribe threw for session {SessionId} on {Subscription}; alarm path remains inactive.", session.SessionId, subscription); } } private static string ResolveAlarmSubscription(AlarmsOptions alarms) { if (!string.IsNullOrWhiteSpace(alarms.SubscriptionExpression)) { return alarms.SubscriptionExpression; } if (!string.IsNullOrWhiteSpace(alarms.DefaultArea)) { return $@"\\{Environment.MachineName}\Galaxy!{alarms.DefaultArea}"; } return string.Empty; } }