Files
mxaccessgw/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
T
Joseph Doherty 615b487a77 docs+ui: backfill XML doc comments and finish dashboard layout pass
Adds missing <summary>/<param> XML docs across 99 server, worker, and test
files so CommentChecker reports zero issues (TreatWarningsAsErrors needs the
analyzer clean). Bundles in WIP dashboard work: NavSection extraction,
MainLayout/site.css/js styling alignment, and DashboardOptions/Auth tweaks.
2026-05-27 14:20:10 -04:00

1123 lines
41 KiB
C#

using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Workers;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
public sealed class GatewaySession
{
private readonly object _syncRoot = new();
private readonly SemaphoreSlim _closeLock = new(1, 1);
private IWorkerClient? _workerClient;
private SessionState _state = SessionState.Creating;
private string? _finalFault;
private DateTimeOffset _lastClientActivityAt;
private DateTimeOffset? _leaseExpiresAt;
private bool _closeStarted;
private int _activeEventSubscriberCount;
private readonly Dictionary<(int ServerHandle, int ItemHandle), SessionItemRegistration> _items = [];
/// <summary>
/// Initializes a gateway session with session metadata and timeout configuration.
/// </summary>
/// <param name="sessionId">Identifier of the session.</param>
/// <param name="backendName">Name of the backend MXAccess proxy server.</param>
/// <param name="pipeName">Name of the named pipe for gateway-worker IPC.</param>
/// <param name="nonce">Security nonce for worker validation.</param>
/// <param name="clientIdentity">Client identity from the authentication context.</param>
/// <param name="clientSessionName">Client-supplied session name.</param>
/// <param name="clientCorrelationId">Client-supplied correlation identifier.</param>
/// <param name="commandTimeout">Timeout for command invocation.</param>
/// <param name="startupTimeout">Timeout for worker process startup.</param>
/// <param name="shutdownTimeout">Timeout for worker process shutdown.</param>
/// <param name="openedAt">Timestamp when the session opened.</param>
public GatewaySession(
string sessionId,
string backendName,
string pipeName,
string nonce,
string? clientIdentity,
string? clientSessionName,
string? clientCorrelationId,
TimeSpan commandTimeout,
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
DateTimeOffset openedAt)
: this(
sessionId,
backendName,
pipeName,
nonce,
clientIdentity,
clientSessionName,
clientCorrelationId,
commandTimeout,
startupTimeout,
shutdownTimeout,
TimeSpan.FromMinutes(30),
openedAt)
{
}
/// <summary>
/// Initializes a gateway session with session metadata, timeout configuration, and custom lease duration.
/// </summary>
/// <param name="sessionId">Identifier of the session.</param>
/// <param name="backendName">Name of the backend MXAccess proxy server.</param>
/// <param name="pipeName">Name of the named pipe for gateway-worker IPC.</param>
/// <param name="nonce">Security nonce for worker validation.</param>
/// <param name="clientIdentity">Client identity from the authentication context.</param>
/// <param name="clientSessionName">Client-supplied session name.</param>
/// <param name="clientCorrelationId">Client-supplied correlation identifier.</param>
/// <param name="commandTimeout">Timeout for command invocation.</param>
/// <param name="startupTimeout">Timeout for worker process startup.</param>
/// <param name="shutdownTimeout">Timeout for worker process shutdown.</param>
/// <param name="leaseDuration">Duration of the session lease.</param>
/// <param name="openedAt">Timestamp when the session opened.</param>
public GatewaySession(
string sessionId,
string backendName,
string pipeName,
string nonce,
string? clientIdentity,
string? clientSessionName,
string? clientCorrelationId,
TimeSpan commandTimeout,
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
TimeSpan leaseDuration,
DateTimeOffset openedAt)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new ArgumentException("Session id is required.", nameof(sessionId));
}
if (string.IsNullOrWhiteSpace(backendName))
{
throw new ArgumentException("Backend name is required.", nameof(backendName));
}
if (string.IsNullOrWhiteSpace(pipeName))
{
throw new ArgumentException("Pipe name is required.", nameof(pipeName));
}
if (string.IsNullOrWhiteSpace(nonce))
{
throw new ArgumentException("Nonce is required.", nameof(nonce));
}
SessionId = sessionId;
BackendName = backendName;
PipeName = pipeName;
Nonce = nonce;
ClientIdentity = clientIdentity;
ClientSessionName = clientSessionName;
ClientCorrelationId = clientCorrelationId;
CommandTimeout = commandTimeout;
StartupTimeout = startupTimeout;
ShutdownTimeout = shutdownTimeout;
LeaseDuration = leaseDuration;
OpenedAt = openedAt;
_lastClientActivityAt = openedAt;
_leaseExpiresAt = openedAt + leaseDuration;
}
/// <summary>
/// Gets the session identifier.
/// </summary>
public string SessionId { get; }
/// <summary>
/// Gets the backend MXAccess proxy server name.
/// </summary>
public string BackendName { get; }
/// <summary>
/// Gets the named pipe name for gateway-worker IPC.
/// </summary>
public string PipeName { get; }
/// <summary>
/// Gets the security nonce for worker validation.
/// </summary>
public string Nonce { get; }
/// <summary>
/// Gets the client identity from the authentication context.
/// </summary>
public string? ClientIdentity { get; }
/// <summary>
/// Gets the client-supplied session name.
/// </summary>
public string? ClientSessionName { get; }
/// <summary>
/// Gets the client-supplied correlation identifier.
/// </summary>
public string? ClientCorrelationId { get; }
/// <summary>
/// Gets the command invocation timeout.
/// </summary>
public TimeSpan CommandTimeout { get; }
/// <summary>
/// Gets the worker process startup timeout.
/// </summary>
public TimeSpan StartupTimeout { get; }
/// <summary>
/// Gets the worker process shutdown timeout.
/// </summary>
public TimeSpan ShutdownTimeout { get; }
/// <summary>Gets the lease duration for the session.</summary>
public TimeSpan LeaseDuration { get; }
/// <summary>
/// Gets the timestamp when the session opened.
/// </summary>
public DateTimeOffset OpenedAt { get; }
/// <summary>
/// Gets the worker process identifier, or null if not yet attached.
/// </summary>
public int? WorkerProcessId => _workerClient?.ProcessId;
/// <summary>
/// Gets the attached worker client, or null if not yet attached.
/// </summary>
public IWorkerClient? WorkerClient => _workerClient;
/// <summary>
/// Gets the current session state.
/// </summary>
public SessionState State
{
get
{
lock (_syncRoot)
{
return _state;
}
}
}
/// <summary>
/// Gets the timestamp of the most recent client activity.
/// </summary>
public DateTimeOffset LastClientActivityAt
{
get
{
lock (_syncRoot)
{
return _lastClientActivityAt;
}
}
}
/// <summary>
/// Gets the lease expiration timestamp, or null if no lease is active.
/// </summary>
public DateTimeOffset? LeaseExpiresAt
{
get
{
lock (_syncRoot)
{
return _leaseExpiresAt;
}
}
}
/// <summary>
/// Gets the fault description if the session is faulted, or null.
/// </summary>
public string? FinalFault
{
get
{
lock (_syncRoot)
{
return _finalFault;
}
}
}
/// <summary>
/// Gets the count of active event stream subscribers.
/// </summary>
public int ActiveEventSubscriberCount
{
get
{
lock (_syncRoot)
{
return _activeEventSubscriberCount;
}
}
}
/// <summary>
/// Attaches the worker client for this session.
/// </summary>
/// <param name="workerClient">Worker client to attach.</param>
public void AttachWorkerClient(IWorkerClient workerClient)
{
ArgumentNullException.ThrowIfNull(workerClient);
lock (_syncRoot)
{
_workerClient = workerClient;
}
}
/// <summary>
/// Transitions the session to a new state with constraints for terminal states.
/// </summary>
/// <param name="nextState">Next session state to transition to.</param>
/// <remarks>
/// <see cref="SessionState.Closed"/> is terminal. <see cref="SessionState.Faulted"/>
/// only allows a transition to <see cref="SessionState.Closed"/>.
/// <see cref="SessionState.Closing"/> only allows a transition to
/// <see cref="SessionState.Closed"/> (or <see cref="SessionState.Faulted"/>) — once
/// <see cref="CloseAsync"/> has started, no late lifecycle callback can revive the
/// session by walking it back to <see cref="SessionState.Ready"/> or any earlier
/// state. Both close-related writes (<c>Closing</c> and <c>Closed</c>) go through
/// <c>_syncRoot</c> just like every other state read/write, closing the split-lock
/// race called out in Server-015.
/// </remarks>
public void TransitionTo(SessionState nextState)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
if (_state is SessionState.Faulted && nextState is not SessionState.Closed)
{
return;
}
if (_state is SessionState.Closing
&& nextState is not SessionState.Closed
&& nextState is not SessionState.Faulted)
{
return;
}
_state = nextState;
}
}
/// <summary>
/// Transitions the session to the Ready state.
/// </summary>
public void MarkReady()
{
TransitionTo(SessionState.Ready);
}
/// <summary>
/// Transitions the session to the Faulted state with a fault description.
/// </summary>
/// <param name="reason">Reason for the fault.</param>
public void MarkFaulted(string reason)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
_finalFault = reason;
_state = SessionState.Faulted;
}
}
/// <summary>
/// Updates the timestamp of the most recent client activity.
/// </summary>
/// <param name="activityAt">Timestamp of the client activity.</param>
public void TouchClientActivity(DateTimeOffset activityAt)
{
lock (_syncRoot)
{
_lastClientActivityAt = activityAt;
_leaseExpiresAt = activityAt + LeaseDuration;
}
}
/// <summary>
/// Extends the session lease to the specified expiration time.
/// </summary>
/// <param name="leaseExpiresAt">Timestamp when the lease expires.</param>
public void ExtendLease(DateTimeOffset leaseExpiresAt)
{
lock (_syncRoot)
{
_leaseExpiresAt = leaseExpiresAt;
}
}
/// <summary>
/// Determines whether the session lease has expired.
/// </summary>
/// <param name="now">Current timestamp for comparison.</param>
public bool IsLeaseExpired(DateTimeOffset now)
{
lock (_syncRoot)
{
return _activeEventSubscriberCount == 0
&& _leaseExpiresAt is not null
&& _leaseExpiresAt <= now;
}
}
/// <summary>
/// Attaches an event subscriber and returns a disposable lease.
/// </summary>
/// <param name="allowMultipleSubscribers">If true, allows multiple concurrent event subscribers.</param>
public IDisposable AttachEventSubscriber(bool allowMultipleSubscribers)
{
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready for event streaming. Current state is {_state}.");
}
if (!allowMultipleSubscribers && _activeEventSubscriberCount > 0)
{
throw new SessionManagerException(
SessionManagerErrorCode.EventSubscriberAlreadyActive,
$"Session {SessionId} already has an active event stream subscriber.");
}
_activeEventSubscriberCount++;
return new EventSubscriberLease(this);
}
}
/// <summary>
/// Invokes a worker command synchronously and returns the reply.
/// </summary>
/// <param name="command">Worker command to invoke.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public async Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
}
/// <summary>Gets the item registration for a server and item handle pair.</summary>
/// <param name="serverHandle">The MXAccess server handle.</param>
/// <param name="itemHandle">The MXAccess item handle.</param>
/// <param name="registration">The item registration if found.</param>
public bool TryGetItemRegistration(
int serverHandle,
int itemHandle,
out SessionItemRegistration registration)
{
lock (_syncRoot)
{
return _items.TryGetValue((serverHandle, itemHandle), out registration!);
}
}
/// <summary>Tracks item registrations from a command reply.</summary>
/// <param name="command">The executed command.</param>
/// <param name="reply">The command reply.</param>
public void TrackCommandReply(
MxCommand command,
MxCommandReply reply)
{
if (reply.ProtocolStatus?.Code is not ProtocolStatusCode.Ok)
{
return;
}
lock (_syncRoot)
{
switch (command.Kind)
{
case MxCommandKind.AddItem when reply.AddItem is not null:
TrackItem(command.AddItem.ServerHandle, reply.AddItem.ItemHandle, command.AddItem.ItemDefinition);
break;
case MxCommandKind.AddItem2 when reply.AddItem2 is not null:
TrackItem(command.AddItem2.ServerHandle, reply.AddItem2.ItemHandle, command.AddItem2.ItemDefinition);
break;
case MxCommandKind.AddBufferedItem when reply.AddBufferedItem is not null:
TrackItem(command.AddBufferedItem.ServerHandle, reply.AddBufferedItem.ItemHandle, command.AddBufferedItem.ItemDefinition);
break;
case MxCommandKind.AddItemBulk when reply.AddItemBulk is not null:
TrackBulkItems(reply.AddItemBulk);
break;
case MxCommandKind.SubscribeBulk when reply.SubscribeBulk is not null:
TrackBulkItems(reply.SubscribeBulk);
break;
case MxCommandKind.RemoveItem:
_items.Remove((command.RemoveItem.ServerHandle, command.RemoveItem.ItemHandle));
break;
case MxCommandKind.RemoveItemBulk:
RemoveItems(command.RemoveItemBulk.ServerHandle, command.RemoveItemBulk.ItemHandles);
break;
case MxCommandKind.UnsubscribeBulk:
RemoveItems(command.UnsubscribeBulk.ServerHandle, command.UnsubscribeBulk.ItemHandles);
break;
}
}
}
/// <summary>
/// Executes a bulk add-item command for the specified server and tag addresses.
/// </summary>
/// <param name="serverHandle">Server handle returned by the worker.</param>
/// <param name="tagAddresses">Tag addresses to add.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task<IReadOnlyList<SubscribeResult>> AddItemBulkAsync(
int serverHandle,
IReadOnlyList<string> tagAddresses,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(tagAddresses);
AddItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.TagAddresses.Add(tagAddresses);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.AddItemBulk,
AddItemBulk = bulkCommand,
},
reply => reply.AddItemBulk,
cancellationToken);
}
/// <summary>
/// Executes a bulk advise-item command for the specified server and item handles.
/// </summary>
/// <param name="serverHandle">Server handle returned by the worker.</param>
/// <param name="itemHandles">Item handles to advise.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task<IReadOnlyList<SubscribeResult>> AdviseItemBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(itemHandles);
AdviseItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.ItemHandles.Add(itemHandles);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.AdviseItemBulk,
AdviseItemBulk = bulkCommand,
},
reply => reply.AdviseItemBulk,
cancellationToken);
}
/// <summary>
/// Executes a bulk remove-item command for the specified server and item handles.
/// </summary>
/// <param name="serverHandle">Server handle returned by the worker.</param>
/// <param name="itemHandles">Item handles to remove.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task<IReadOnlyList<SubscribeResult>> RemoveItemBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(itemHandles);
RemoveItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.ItemHandles.Add(itemHandles);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.RemoveItemBulk,
RemoveItemBulk = bulkCommand,
},
reply => reply.RemoveItemBulk,
cancellationToken);
}
/// <summary>
/// Executes a bulk un-advise-item command for the specified server and item handles.
/// </summary>
/// <param name="serverHandle">Server handle returned by the worker.</param>
/// <param name="itemHandles">Item handles to un-advise.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task<IReadOnlyList<SubscribeResult>> UnAdviseItemBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(itemHandles);
UnAdviseItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.ItemHandles.Add(itemHandles);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.UnAdviseItemBulk,
UnAdviseItemBulk = bulkCommand,
},
reply => reply.UnAdviseItemBulk,
cancellationToken);
}
/// <summary>
/// Executes a bulk subscribe command for the specified server and tag addresses.
/// </summary>
/// <param name="serverHandle">Server handle returned by the worker.</param>
/// <param name="tagAddresses">Tag addresses to subscribe to.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
int serverHandle,
IReadOnlyList<string> tagAddresses,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(tagAddresses);
SubscribeBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.TagAddresses.Add(tagAddresses);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.SubscribeBulk,
SubscribeBulk = bulkCommand,
},
reply => reply.SubscribeBulk,
cancellationToken);
}
/// <summary>
/// Executes a bulk unsubscribe command for the specified server and item handles.
/// </summary>
/// <param name="serverHandle">Server handle returned by the worker.</param>
/// <param name="itemHandles">Item handles to unsubscribe from.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task<IReadOnlyList<SubscribeResult>> UnsubscribeBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(itemHandles);
UnsubscribeBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.ItemHandles.Add(itemHandles);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.UnsubscribeBulk,
UnsubscribeBulk = bulkCommand,
},
reply => reply.UnsubscribeBulk,
cancellationToken);
}
/// <summary>Executes a bulk Write command for the specified server and per-item entries.</summary>
/// <param name="serverHandle">Server handle returned by the worker.</param>
/// <param name="entries">Write entries to execute.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task<IReadOnlyList<BulkWriteResult>> WriteBulkAsync(
int serverHandle,
IReadOnlyList<WriteBulkEntry> entries,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(entries);
WriteBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.Entries.Add(entries);
return InvokeBulkWriteAsync(
new MxCommand
{
Kind = MxCommandKind.WriteBulk,
WriteBulk = bulkCommand,
},
reply => reply.WriteBulk,
cancellationToken);
}
/// <summary>Executes a bulk Write2 (timestamped) command.</summary>
/// <param name="serverHandle">Server handle returned by the worker.</param>
/// <param name="entries">Write entries to execute.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task<IReadOnlyList<BulkWriteResult>> Write2BulkAsync(
int serverHandle,
IReadOnlyList<Write2BulkEntry> entries,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(entries);
Write2BulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.Entries.Add(entries);
return InvokeBulkWriteAsync(
new MxCommand
{
Kind = MxCommandKind.Write2Bulk,
Write2Bulk = bulkCommand,
},
reply => reply.Write2Bulk,
cancellationToken);
}
/// <summary>Executes a bulk WriteSecured command.</summary>
/// <param name="serverHandle">Server handle returned by the worker.</param>
/// <param name="entries">Write entries to execute.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task<IReadOnlyList<BulkWriteResult>> WriteSecuredBulkAsync(
int serverHandle,
IReadOnlyList<WriteSecuredBulkEntry> entries,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(entries);
WriteSecuredBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.Entries.Add(entries);
return InvokeBulkWriteAsync(
new MxCommand
{
Kind = MxCommandKind.WriteSecuredBulk,
WriteSecuredBulk = bulkCommand,
},
reply => reply.WriteSecuredBulk,
cancellationToken);
}
/// <summary>Executes a bulk WriteSecured2 command.</summary>
/// <param name="serverHandle">Server handle returned by the worker.</param>
/// <param name="entries">Write entries to execute.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task<IReadOnlyList<BulkWriteResult>> WriteSecured2BulkAsync(
int serverHandle,
IReadOnlyList<WriteSecured2BulkEntry> entries,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(entries);
WriteSecured2BulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.Entries.Add(entries);
return InvokeBulkWriteAsync(
new MxCommand
{
Kind = MxCommandKind.WriteSecured2Bulk,
WriteSecured2Bulk = bulkCommand,
},
reply => reply.WriteSecured2Bulk,
cancellationToken);
}
/// <summary>
/// Executes a bulk Read command — see <c>ReadBulkCommand</c>'s doc
/// comment in the .proto for the cached-vs-snapshot semantics.
/// </summary>
/// <param name="serverHandle">Server handle returned by the worker.</param>
/// <param name="tagAddresses">Tag addresses to read.</param>
/// <param name="timeout">Timeout for the read operation.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public Task<IReadOnlyList<BulkReadResult>> ReadBulkAsync(
int serverHandle,
IReadOnlyList<string> tagAddresses,
TimeSpan timeout,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(tagAddresses);
ReadBulkCommand bulkCommand = new()
{
ServerHandle = serverHandle,
TimeoutMs = timeout <= TimeSpan.Zero ? 0u : (uint)Math.Min(timeout.TotalMilliseconds, uint.MaxValue),
};
bulkCommand.TagAddresses.Add(tagAddresses);
return InvokeBulkReadAsync(
new MxCommand
{
Kind = MxCommandKind.ReadBulk,
ReadBulk = bulkCommand,
},
reply => reply.ReadBulk,
cancellationToken);
}
/// <summary>
/// Reads events from the worker as an asynchronous enumerable stream.
/// </summary>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return workerClient.ReadEventsAsync(cancellationToken);
}
/// <summary>
/// Closes the session and shuts down the worker process.
/// </summary>
/// <param name="reason">Reason for closing the session.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
/// <remarks>
/// Concurrent close attempts are serialized by <c>_closeLock</c> so only one close
/// runs at a time, but every read/write of <c>_state</c> still passes through
/// <c>_syncRoot</c> (via <see cref="TryBeginClose"/> and <see cref="MarkClosed"/>) —
/// the close path therefore obeys the same lock discipline as
/// <see cref="TransitionTo"/> / <see cref="MarkFaulted"/> and a concurrent
/// <c>TransitionTo(Ready)</c> cannot race past a <c>Closing</c> write.
/// </remarks>
public async Task<SessionCloseResult> CloseAsync(
string reason,
CancellationToken cancellationToken)
{
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
if (!TryBeginClose(out bool alreadyClosing))
{
return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true);
}
if (_workerClient is not null)
{
try
{
await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
try
{
_workerClient.Kill(reason);
}
catch (Exception killException)
{
throw new SessionCloseStartedException(
$"Session {SessionId} close failed after worker shutdown started.",
new AggregateException(exception, killException));
}
throw;
}
}
MarkClosed();
return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing);
}
catch (Exception exception) when (exception is not SessionCloseStartedException)
{
throw new SessionCloseStartedException(
$"Session {SessionId} close failed after the close lock was acquired.",
exception);
}
}
finally
{
_closeLock.Release();
}
}
// Returns false when the session is already Closed (caller short-circuits with
// AlreadyClosed: true). Otherwise sets _state = Closing under _syncRoot so a
// concurrent TransitionTo(Ready) — which only refuses to overwrite Closed/Faulted
// — cannot flip the session back to Ready after close started. The `alreadyClosing`
// out parameter mirrors the previous `_closeStarted` check so the surface contract
// (a second concurrent close returns AlreadyClosed: alreadyClosing) is preserved.
private bool TryBeginClose(out bool alreadyClosing)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
alreadyClosing = _closeStarted;
return false;
}
alreadyClosing = _closeStarted;
_closeStarted = true;
_state = SessionState.Closing;
return true;
}
}
// Final terminal transition; under _syncRoot to keep _state writes single-lock.
// Closed is unconditionally terminal — TransitionTo refuses to overwrite it —
// so we don't need to re-check the precondition here.
private void MarkClosed()
{
lock (_syncRoot)
{
_state = SessionState.Closed;
}
}
/// <summary>
/// Terminates the worker process immediately.
/// </summary>
/// <param name="reason">Reason for killing the worker.</param>
public void KillWorker(string reason)
{
_workerClient?.Kill(reason);
TransitionTo(SessionState.Closed);
}
/// <summary>
/// Terminates the worker process immediately while holding the per-session
/// close lock so concurrent close/kill callers serialize. Returns the
/// session state observed at the start of the call so the caller can
/// dedup metric accounting (e.g. only record <c>SessionClosed</c> when
/// the session was not already closed).
/// </summary>
/// <remarks>
/// Mirrors <see cref="CloseAsync"/>'s use of <c>_closeLock</c> so that
/// a Close in flight from one caller and a Kill from another do not
/// race on the "was the session already closed" observation that
/// drives metric increments (Server-045).
/// </remarks>
/// <param name="reason">Reason for killing the worker.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns><c>true</c> if the session was already <see cref="SessionState.Closed"/> when the lock was acquired; otherwise <c>false</c>.</returns>
public async ValueTask<bool> KillWorkerWithCloseGateAsync(
string reason,
CancellationToken cancellationToken)
{
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
bool wasClosed;
lock (_syncRoot)
{
wasClosed = _state == SessionState.Closed;
}
_workerClient?.Kill(reason);
TransitionTo(SessionState.Closed);
return wasClosed;
}
finally
{
_closeLock.Release();
}
}
/// <summary>
/// Disposes the session and frees associated resources.
/// </summary>
/// <remarks>
/// Acquires <c>_closeLock</c> once before disposing so an in-flight
/// <see cref="CloseAsync"/> finishes before the semaphore is released and
/// reclaimed. Without this gate, the in-flight close's <c>_closeLock.Release()</c>
/// would race the dispose and raise <see cref="ObjectDisposedException"/>.
/// The acquire is best-effort: a non-cancellable wait that swallows
/// <see cref="ObjectDisposedException"/> so double-dispose still completes.
/// </remarks>
public async ValueTask DisposeAsync()
{
try
{
// CancellationToken.None — disposal must not be cancelled, and a misbehaving
// close path that never releases would have to be torn down by the worker
// shutdown timeout long before we reach here.
await _closeLock.WaitAsync(CancellationToken.None).ConfigureAwait(false);
try
{
// Hand the slot back so the semaphore's internal counter is consistent
// for any contemporaneous waiter, then dispose. Once disposed, every
// subsequent WaitAsync / Release will throw — but DisposeAsync's contract
// is "no concurrent close after this point", which SessionManager honors.
_closeLock.Release();
}
catch (ObjectDisposedException)
{
}
}
catch (ObjectDisposedException)
{
// Already disposed (e.g. double-dispose); nothing to gate on.
}
try
{
_closeLock.Dispose();
}
catch (ObjectDisposedException)
{
}
if (_workerClient is not null)
{
await _workerClient.DisposeAsync().ConfigureAwait(false);
}
}
private async Task<IReadOnlyList<SubscribeResult>> InvokeBulkAsync(
MxCommand command,
Func<MxCommandReply, BulkSubscribeReply?> payloadAccessor,
CancellationToken cancellationToken)
{
MxCommandReply reply = await InvokeBulkInternalAsync(command, cancellationToken).ConfigureAwait(false);
return payloadAccessor(reply)?.Results.ToArray() ?? [];
}
private async Task<IReadOnlyList<BulkWriteResult>> InvokeBulkWriteAsync(
MxCommand command,
Func<MxCommandReply, BulkWriteReply?> payloadAccessor,
CancellationToken cancellationToken)
{
MxCommandReply reply = await InvokeBulkInternalAsync(command, cancellationToken).ConfigureAwait(false);
return payloadAccessor(reply)?.Results.ToArray() ?? [];
}
private async Task<IReadOnlyList<BulkReadResult>> InvokeBulkReadAsync(
MxCommand command,
Func<MxCommandReply, BulkReadReply?> payloadAccessor,
CancellationToken cancellationToken)
{
MxCommandReply reply = await InvokeBulkInternalAsync(command, cancellationToken).ConfigureAwait(false);
return payloadAccessor(reply)?.Results.ToArray() ?? [];
}
// Single round-trip + protocol-status check shared by every bulk variant.
// Callers project the typed reply payload out via their own accessor — the
// outer envelope handling is identical across SubscribeResult-based bulks,
// BulkWriteResult-based writes, and BulkReadResult-based reads.
private async Task<MxCommandReply> InvokeBulkInternalAsync(
MxCommand command,
CancellationToken cancellationToken)
{
WorkerCommandReply workerReply = await InvokeAsync(
new WorkerCommand { Command = command },
cancellationToken)
.ConfigureAwait(false);
MxCommandReply reply = workerReply.Reply ?? new MxCommandReply
{
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.ProtocolViolation,
Message = "Worker command reply did not contain a public reply payload.",
},
};
if (reply.ProtocolStatus?.Code is not ProtocolStatusCode.Ok)
{
string message = reply.ProtocolStatus?.Message ?? reply.DiagnosticMessage;
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
string.IsNullOrWhiteSpace(message) ? "Bulk MXAccess command failed." : message);
}
return reply;
}
/// <summary>
/// Returns the worker client iff both the gateway-side session state AND
/// the worker client's own state are <see cref="SessionState.Ready"/> /
/// <see cref="WorkerClientState.Ready"/>. The two states can diverge under
/// load: <c>_state</c> only transitions on gateway-driven events (open,
/// close, fault), while <see cref="WorkerClient.State"/> can shift on
/// worker-side signals (heartbeat watchdog, pipe disconnect) before the
/// gateway's session-level reaction observes them. When that happens the
/// in-flight RPC fails fast here with both states surfaced in the
/// diagnostic (Server-030) so the actual mismatch is actionable instead
/// of misleading. The session usually transitions to <c>Faulted</c>
/// shortly after.
/// </summary>
private IWorkerClient GetReadyWorkerClient()
{
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
{
string workerState = _workerClient is null
? "<no worker>"
: _workerClient.State.ToString();
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready. Session state is {_state}; worker state is {workerState}.");
}
return _workerClient;
}
}
private void TrackItem(
int serverHandle,
int itemHandle,
string tagAddress)
{
if (itemHandle == 0 || string.IsNullOrWhiteSpace(tagAddress))
{
return;
}
_items[(serverHandle, itemHandle)] = new SessionItemRegistration(serverHandle, itemHandle, tagAddress);
}
private void TrackBulkItems(BulkSubscribeReply reply)
{
foreach (SubscribeResult result in reply.Results)
{
if (result.WasSuccessful)
{
TrackItem(result.ServerHandle, result.ItemHandle, result.TagAddress);
}
}
}
private void RemoveItems(
int serverHandle,
IEnumerable<int> itemHandles)
{
foreach (int itemHandle in itemHandles)
{
_items.Remove((serverHandle, itemHandle));
}
}
private void DetachEventSubscriber()
{
lock (_syncRoot)
{
if (_activeEventSubscriberCount > 0)
{
_activeEventSubscriberCount--;
}
}
}
private sealed class EventSubscriberLease(GatewaySession session) : IDisposable
{
private bool _disposed;
/// <summary>
/// Disposes the lease and detaches the event subscriber.
/// </summary>
public void Dispose()
{
if (_disposed)
{
return;
}
session.DetachEventSubscriber();
_disposed = true;
}
}
}