using MxGateway.Contracts.Proto; using MxGateway.Server.Workers; namespace MxGateway.Server.Sessions; public sealed class GatewaySession { private readonly object _syncRoot = new(); private readonly SemaphoreSlim _closeLock = new(1, 1); private IWorkerClient? _workerClient; private SessionState _state = SessionState.Creating; private string? _finalFault; private DateTimeOffset _lastClientActivityAt; private DateTimeOffset? _leaseExpiresAt; private bool _closeStarted; private int _activeEventSubscriberCount; private readonly Dictionary<(int ServerHandle, int ItemHandle), SessionItemRegistration> _items = []; /// /// Initializes a gateway session with session metadata and timeout configuration. /// /// Identifier of the session. /// Name of the backend MXAccess proxy server. /// Name of the named pipe for gateway-worker IPC. /// Security nonce for worker validation. /// Client identity from the authentication context. /// Client-supplied session name. /// Client-supplied correlation identifier. /// Timeout for command invocation. /// Timeout for worker process startup. /// Timeout for worker process shutdown. /// Timestamp when the session opened. public GatewaySession( string sessionId, string backendName, string pipeName, string nonce, string? clientIdentity, string? clientSessionName, string? clientCorrelationId, TimeSpan commandTimeout, TimeSpan startupTimeout, TimeSpan shutdownTimeout, DateTimeOffset openedAt) : this( sessionId, backendName, pipeName, nonce, clientIdentity, clientSessionName, clientCorrelationId, commandTimeout, startupTimeout, shutdownTimeout, TimeSpan.FromMinutes(30), openedAt) { } public GatewaySession( string sessionId, string backendName, string pipeName, string nonce, string? clientIdentity, string? clientSessionName, string? clientCorrelationId, TimeSpan commandTimeout, TimeSpan startupTimeout, TimeSpan shutdownTimeout, TimeSpan leaseDuration, DateTimeOffset openedAt) { if (string.IsNullOrWhiteSpace(sessionId)) { throw new ArgumentException("Session id is required.", nameof(sessionId)); } if (string.IsNullOrWhiteSpace(backendName)) { throw new ArgumentException("Backend name is required.", nameof(backendName)); } if (string.IsNullOrWhiteSpace(pipeName)) { throw new ArgumentException("Pipe name is required.", nameof(pipeName)); } if (string.IsNullOrWhiteSpace(nonce)) { throw new ArgumentException("Nonce is required.", nameof(nonce)); } SessionId = sessionId; BackendName = backendName; PipeName = pipeName; Nonce = nonce; ClientIdentity = clientIdentity; ClientSessionName = clientSessionName; ClientCorrelationId = clientCorrelationId; CommandTimeout = commandTimeout; StartupTimeout = startupTimeout; ShutdownTimeout = shutdownTimeout; LeaseDuration = leaseDuration; OpenedAt = openedAt; _lastClientActivityAt = openedAt; _leaseExpiresAt = openedAt + leaseDuration; } /// /// Gets the session identifier. /// public string SessionId { get; } /// /// Gets the backend MXAccess proxy server name. /// public string BackendName { get; } /// /// Gets the named pipe name for gateway-worker IPC. /// public string PipeName { get; } /// /// Gets the security nonce for worker validation. /// public string Nonce { get; } /// /// Gets the client identity from the authentication context. /// public string? ClientIdentity { get; } /// /// Gets the client-supplied session name. /// public string? ClientSessionName { get; } /// /// Gets the client-supplied correlation identifier. /// public string? ClientCorrelationId { get; } /// /// Gets the command invocation timeout. /// public TimeSpan CommandTimeout { get; } /// /// Gets the worker process startup timeout. /// public TimeSpan StartupTimeout { get; } /// /// Gets the worker process shutdown timeout. /// public TimeSpan ShutdownTimeout { get; } public TimeSpan LeaseDuration { get; } /// /// Gets the timestamp when the session opened. /// public DateTimeOffset OpenedAt { get; } /// /// Gets the worker process identifier, or null if not yet attached. /// public int? WorkerProcessId => _workerClient?.ProcessId; /// /// Gets the attached worker client, or null if not yet attached. /// public IWorkerClient? WorkerClient => _workerClient; /// /// Gets the current session state. /// public SessionState State { get { lock (_syncRoot) { return _state; } } } /// /// Gets the timestamp of the most recent client activity. /// public DateTimeOffset LastClientActivityAt { get { lock (_syncRoot) { return _lastClientActivityAt; } } } /// /// Gets the lease expiration timestamp, or null if no lease is active. /// public DateTimeOffset? LeaseExpiresAt { get { lock (_syncRoot) { return _leaseExpiresAt; } } } /// /// Gets the fault description if the session is faulted, or null. /// public string? FinalFault { get { lock (_syncRoot) { return _finalFault; } } } /// /// Gets the count of active event stream subscribers. /// public int ActiveEventSubscriberCount { get { lock (_syncRoot) { return _activeEventSubscriberCount; } } } /// /// Attaches the worker client for this session. /// /// Worker client to attach. public void AttachWorkerClient(IWorkerClient workerClient) { ArgumentNullException.ThrowIfNull(workerClient); lock (_syncRoot) { _workerClient = workerClient; } } /// /// Transitions the session to a new state with constraints for terminal states. /// /// Next session state to transition to. public void TransitionTo(SessionState nextState) { lock (_syncRoot) { if (_state is SessionState.Closed) { return; } if (_state is SessionState.Faulted && nextState is not SessionState.Closed) { return; } _state = nextState; } } /// /// Transitions the session to the Ready state. /// public void MarkReady() { TransitionTo(SessionState.Ready); } /// /// Transitions the session to the Faulted state with a fault description. /// /// Reason for the fault. public void MarkFaulted(string reason) { lock (_syncRoot) { if (_state is SessionState.Closed) { return; } _finalFault = reason; _state = SessionState.Faulted; } } /// /// Updates the timestamp of the most recent client activity. /// /// Timestamp of the client activity. public void TouchClientActivity(DateTimeOffset activityAt) { lock (_syncRoot) { _lastClientActivityAt = activityAt; _leaseExpiresAt = activityAt + LeaseDuration; } } /// /// Extends the session lease to the specified expiration time. /// /// Timestamp when the lease expires. public void ExtendLease(DateTimeOffset leaseExpiresAt) { lock (_syncRoot) { _leaseExpiresAt = leaseExpiresAt; } } /// /// Determines whether the session lease has expired. /// /// Current timestamp for comparison. public bool IsLeaseExpired(DateTimeOffset now) { lock (_syncRoot) { return _activeEventSubscriberCount == 0 && _leaseExpiresAt is not null && _leaseExpiresAt <= now; } } /// /// Attaches an event subscriber and returns a disposable lease. /// /// If true, allows multiple concurrent event subscribers. public IDisposable AttachEventSubscriber(bool allowMultipleSubscribers) { lock (_syncRoot) { if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready) { throw new SessionManagerException( SessionManagerErrorCode.SessionNotReady, $"Session {SessionId} is not ready for event streaming. Current state is {_state}."); } if (!allowMultipleSubscribers && _activeEventSubscriberCount > 0) { throw new SessionManagerException( SessionManagerErrorCode.EventSubscriberAlreadyActive, $"Session {SessionId} already has an active event stream subscriber."); } _activeEventSubscriberCount++; return new EventSubscriberLease(this); } } /// /// Invokes a worker command synchronously and returns the reply. /// /// Worker command to invoke. /// Token to cancel the asynchronous operation. public async Task InvokeAsync( WorkerCommand command, CancellationToken cancellationToken) { IWorkerClient workerClient = GetReadyWorkerClient(); TouchClientActivity(DateTimeOffset.UtcNow); return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false); } public bool TryGetItemRegistration( int serverHandle, int itemHandle, out SessionItemRegistration registration) { lock (_syncRoot) { return _items.TryGetValue((serverHandle, itemHandle), out registration!); } } public void TrackCommandReply( MxCommand command, MxCommandReply reply) { if (reply.ProtocolStatus?.Code is not ProtocolStatusCode.Ok) { return; } lock (_syncRoot) { switch (command.Kind) { case MxCommandKind.AddItem when reply.AddItem is not null: TrackItem(command.AddItem.ServerHandle, reply.AddItem.ItemHandle, command.AddItem.ItemDefinition); break; case MxCommandKind.AddItem2 when reply.AddItem2 is not null: TrackItem(command.AddItem2.ServerHandle, reply.AddItem2.ItemHandle, command.AddItem2.ItemDefinition); break; case MxCommandKind.AddBufferedItem when reply.AddBufferedItem is not null: TrackItem(command.AddBufferedItem.ServerHandle, reply.AddBufferedItem.ItemHandle, command.AddBufferedItem.ItemDefinition); break; case MxCommandKind.AddItemBulk when reply.AddItemBulk is not null: TrackBulkItems(reply.AddItemBulk); break; case MxCommandKind.SubscribeBulk when reply.SubscribeBulk is not null: TrackBulkItems(reply.SubscribeBulk); break; case MxCommandKind.RemoveItem: _items.Remove((command.RemoveItem.ServerHandle, command.RemoveItem.ItemHandle)); break; case MxCommandKind.RemoveItemBulk: RemoveItems(command.RemoveItemBulk.ServerHandle, command.RemoveItemBulk.ItemHandles); break; case MxCommandKind.UnsubscribeBulk: RemoveItems(command.UnsubscribeBulk.ServerHandle, command.UnsubscribeBulk.ItemHandles); break; } } } /// /// Executes a bulk add-item command for the specified server and tag addresses. /// /// Server handle returned by the worker. /// Tag addresses to add. /// Token to cancel the asynchronous operation. public Task> AddItemBulkAsync( int serverHandle, IReadOnlyList tagAddresses, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(tagAddresses); AddItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.TagAddresses.Add(tagAddresses); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.AddItemBulk, AddItemBulk = bulkCommand, }, reply => reply.AddItemBulk, cancellationToken); } /// /// Executes a bulk advise-item command for the specified server and item handles. /// /// Server handle returned by the worker. /// Item handles to advise. /// Token to cancel the asynchronous operation. public Task> AdviseItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(itemHandles); AdviseItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.ItemHandles.Add(itemHandles); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.AdviseItemBulk, AdviseItemBulk = bulkCommand, }, reply => reply.AdviseItemBulk, cancellationToken); } /// /// Executes a bulk remove-item command for the specified server and item handles. /// /// Server handle returned by the worker. /// Item handles to remove. /// Token to cancel the asynchronous operation. public Task> RemoveItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(itemHandles); RemoveItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.ItemHandles.Add(itemHandles); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.RemoveItemBulk, RemoveItemBulk = bulkCommand, }, reply => reply.RemoveItemBulk, cancellationToken); } /// /// Executes a bulk un-advise-item command for the specified server and item handles. /// /// Server handle returned by the worker. /// Item handles to un-advise. /// Token to cancel the asynchronous operation. public Task> UnAdviseItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(itemHandles); UnAdviseItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.ItemHandles.Add(itemHandles); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.UnAdviseItemBulk, UnAdviseItemBulk = bulkCommand, }, reply => reply.UnAdviseItemBulk, cancellationToken); } /// /// Executes a bulk subscribe command for the specified server and tag addresses. /// /// Server handle returned by the worker. /// Tag addresses to subscribe to. /// Token to cancel the asynchronous operation. public Task> SubscribeBulkAsync( int serverHandle, IReadOnlyList tagAddresses, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(tagAddresses); SubscribeBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.TagAddresses.Add(tagAddresses); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.SubscribeBulk, SubscribeBulk = bulkCommand, }, reply => reply.SubscribeBulk, cancellationToken); } /// /// Executes a bulk unsubscribe command for the specified server and item handles. /// /// Server handle returned by the worker. /// Item handles to unsubscribe from. /// Token to cancel the asynchronous operation. public Task> UnsubscribeBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(itemHandles); UnsubscribeBulkCommand bulkCommand = new() { ServerHandle = serverHandle }; bulkCommand.ItemHandles.Add(itemHandles); return InvokeBulkAsync( new MxCommand { Kind = MxCommandKind.UnsubscribeBulk, UnsubscribeBulk = bulkCommand, }, reply => reply.UnsubscribeBulk, cancellationToken); } /// /// Reads events from the worker as an asynchronous enumerable stream. /// /// Token to cancel the asynchronous operation. public IAsyncEnumerable ReadEventsAsync(CancellationToken cancellationToken) { IWorkerClient workerClient = GetReadyWorkerClient(); TouchClientActivity(DateTimeOffset.UtcNow); return workerClient.ReadEventsAsync(cancellationToken); } /// /// Closes the session and shuts down the worker process. /// /// Reason for closing the session. /// Token to cancel the asynchronous operation. public async Task CloseAsync( string reason, CancellationToken cancellationToken) { await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { try { if (_state is SessionState.Closed) { return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true); } bool alreadyClosing = _closeStarted; _closeStarted = true; _state = SessionState.Closing; if (_workerClient is not null) { try { await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false); } catch (Exception exception) { try { _workerClient.Kill(reason); } catch (Exception killException) { throw new SessionCloseStartedException( $"Session {SessionId} close failed after worker shutdown started.", new AggregateException(exception, killException)); } throw; } } _state = SessionState.Closed; return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing); } catch (Exception exception) when (exception is not SessionCloseStartedException) { throw new SessionCloseStartedException( $"Session {SessionId} close failed after the close lock was acquired.", exception); } } finally { _closeLock.Release(); } } /// /// Terminates the worker process immediately. /// /// Reason for killing the worker. public void KillWorker(string reason) { _workerClient?.Kill(reason); TransitionTo(SessionState.Closed); } /// /// Disposes the session and frees associated resources. /// public async ValueTask DisposeAsync() { _closeLock.Dispose(); if (_workerClient is not null) { await _workerClient.DisposeAsync().ConfigureAwait(false); } } private async Task> InvokeBulkAsync( MxCommand command, Func payloadAccessor, CancellationToken cancellationToken) { WorkerCommandReply workerReply = await InvokeAsync( new WorkerCommand { Command = command }, cancellationToken) .ConfigureAwait(false); MxCommandReply reply = workerReply.Reply ?? new MxCommandReply { ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.ProtocolViolation, Message = "Worker command reply did not contain a public reply payload.", }, }; if (reply.ProtocolStatus?.Code is not ProtocolStatusCode.Ok) { string message = reply.ProtocolStatus?.Message ?? reply.DiagnosticMessage; throw new SessionManagerException( SessionManagerErrorCode.SessionNotReady, string.IsNullOrWhiteSpace(message) ? "Bulk MXAccess command failed." : message); } return payloadAccessor(reply)?.Results.ToArray() ?? []; } private IWorkerClient GetReadyWorkerClient() { lock (_syncRoot) { if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready) { throw new SessionManagerException( SessionManagerErrorCode.SessionNotReady, $"Session {SessionId} is not ready. Current state is {_state}."); } return _workerClient; } } private void TrackItem( int serverHandle, int itemHandle, string tagAddress) { if (itemHandle == 0 || string.IsNullOrWhiteSpace(tagAddress)) { return; } _items[(serverHandle, itemHandle)] = new SessionItemRegistration(serverHandle, itemHandle, tagAddress); } private void TrackBulkItems(BulkSubscribeReply reply) { foreach (SubscribeResult result in reply.Results) { if (result.WasSuccessful) { TrackItem(result.ServerHandle, result.ItemHandle, result.TagAddress); } } } private void RemoveItems( int serverHandle, IEnumerable itemHandles) { foreach (int itemHandle in itemHandles) { _items.Remove((serverHandle, itemHandle)); } } private void DetachEventSubscriber() { lock (_syncRoot) { if (_activeEventSubscriberCount > 0) { _activeEventSubscriberCount--; } } } private sealed class EventSubscriberLease(GatewaySession session) : IDisposable { private bool _disposed; /// /// Disposes the lease and detaches the event subscriber. /// public void Dispose() { if (_disposed) { return; } session.DetachEventSubscriber(); _disposed = true; } } }