using MxGateway.Contracts.Proto; namespace MxGateway.Client; /// /// Represents one gateway-backed MXAccess session. /// public sealed class MxGatewaySession : IAsyncDisposable { private readonly MxGatewayClient _client; private readonly SemaphoreSlim _closeLock = new(1, 1); private readonly object _disposeGate = new(); private CloseSessionReply? _closeReply; private int _activeCloseCount; private bool _closeLockDisposed; /// /// Initializes a new session backed by the given MXAccess gateway client. /// /// The gateway client used for commands and events. /// The server's session creation response. internal MxGatewaySession( MxGatewayClient client, OpenSessionReply openSessionReply) { _client = client ?? throw new ArgumentNullException(nameof(client)); OpenSessionReply = openSessionReply ?? throw new ArgumentNullException(nameof(openSessionReply)); } /// /// The session ID assigned by the gateway. /// public string SessionId => OpenSessionReply.SessionId; /// /// The server's session creation response containing metadata. /// public OpenSessionReply OpenSessionReply { get; } /// /// Closes the session on the gateway. Idempotent. /// /// Cancellation token. /// The server's close-session reply. public async Task CloseAsync(CancellationToken cancellationToken = default) { if (_closeReply is not null) { return _closeReply; } // Register as an in-flight closer under the dispose gate. DisposeAsync waits for // _activeCloseCount to drain before disposing the close lock, so the semaphore is // guaranteed to outlive every WaitAsync started here. lock (_disposeGate) { ObjectDisposedException.ThrowIf(_closeLockDisposed, this); _activeCloseCount++; } try { await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_closeReply is not null) { return _closeReply; } _closeReply = await _client.CloseSessionRawAsync( new CloseSessionRequest { SessionId = SessionId }, cancellationToken) .ConfigureAwait(false); return _closeReply; } finally { _closeLock.Release(); } } finally { lock (_disposeGate) { _activeCloseCount--; } } } /// /// Registers a client with the MXAccess session, returning a ServerHandle. /// /// Name to register. /// Cancellation token. /// The server handle assigned to the registered client. public async Task RegisterAsync( string clientName, CancellationToken cancellationToken = default) { MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.Register?.ServerHandle ?? throw CreateMissingPayloadException(reply, "register"); } /// /// Registers a client with the MXAccess session without error checking. /// /// Name to register. /// Cancellation token. /// The raw server reply. public Task RegisterRawAsync( string clientName, CancellationToken cancellationToken = default) { ArgumentException.ThrowIfNullOrWhiteSpace(clientName); return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.Register, Register = new RegisterCommand { ClientName = clientName }, }, cancellationToken); } /// /// Adds an item to the MXAccess session, returning an ItemHandle. /// /// The ServerHandle from register. /// The item tag address. /// Cancellation token. /// The item handle assigned to the new item. public async Task AddItemAsync( int serverHandle, string itemDefinition, CancellationToken cancellationToken = default) { MxCommandReply reply = await AddItemRawAsync( serverHandle, itemDefinition, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.AddItem?.ItemHandle ?? throw CreateMissingPayloadException(reply, "add_item"); } /// /// Adds an item to the MXAccess session without error checking. /// /// The ServerHandle from register. /// The item tag address. /// Cancellation token. /// The raw server reply. public Task AddItemRawAsync( int serverHandle, string itemDefinition, CancellationToken cancellationToken = default) { ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition); return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = serverHandle, ItemDefinition = itemDefinition, }, }, cancellationToken); } /// /// Adds an item with context to the MXAccess session, returning an ItemHandle. /// /// The ServerHandle from register. /// The item tag address. /// Additional context for the item. /// Cancellation token. /// The item handle assigned to the new item. public async Task AddItem2Async( int serverHandle, string itemDefinition, string itemContext, CancellationToken cancellationToken = default) { MxCommandReply reply = await AddItem2RawAsync( serverHandle, itemDefinition, itemContext, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.AddItem2?.ItemHandle ?? throw CreateMissingPayloadException(reply, "add_item2"); } /// /// Adds an item with context to the MXAccess session without error checking. /// /// The ServerHandle from register. /// The item tag address. /// Additional context for the item. /// Cancellation token. /// The raw server reply. public Task AddItem2RawAsync( int serverHandle, string itemDefinition, string itemContext, CancellationToken cancellationToken = default) { ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition); return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.AddItem2, AddItem2 = new AddItem2Command { ServerHandle = serverHandle, ItemDefinition = itemDefinition, ItemContext = itemContext ?? string.Empty, }, }, cancellationToken); } /// /// Subscribes to events for an item (advises in MXAccess terminology). /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. public async Task AdviseAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { MxCommandReply reply = await AdviseRawAsync(serverHandle, itemHandle, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); } /// /// Subscribes to events for an item without error checking. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. /// The raw server reply. public Task AdviseRawAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.Advise, Advise = new AdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, cancellationToken); } /// /// Unsubscribes from events for an item (unadvises in MXAccess terminology). /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. public async Task UnAdviseAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { MxCommandReply reply = await UnAdviseRawAsync(serverHandle, itemHandle, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); } /// /// Unsubscribes from events for an item without error checking. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. /// The raw server reply. public Task UnAdviseRawAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.UnAdvise, UnAdvise = new UnAdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, cancellationToken); } /// /// Removes an item from the MXAccess session. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. public async Task RemoveItemAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { MxCommandReply reply = await RemoveItemRawAsync(serverHandle, itemHandle, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); } /// /// Removes an item from the MXAccess session without error checking. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. /// The raw server reply. public Task RemoveItemRawAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.RemoveItem, RemoveItem = new RemoveItemCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, cancellationToken); } /// /// Adds multiple items to the MXAccess session in a single command. /// /// The ServerHandle from register. /// The item tag addresses to add. /// Cancellation token. /// Per-item subscription results. public async Task> AddItemBulkAsync( int serverHandle, IReadOnlyList tagAddresses, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(tagAddresses); AddItemBulkCommand command = new() { ServerHandle = serverHandle }; command.TagAddresses.Add(tagAddresses); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.AddItemBulk, AddItemBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.AddItemBulk?.Results.ToArray() ?? []; } /// /// Advises multiple items in a single command. /// /// The ServerHandle from register. /// The ItemHandles to advise. /// Cancellation token. /// Per-item subscription results. public async Task> AdviseItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(itemHandles); AdviseItemBulkCommand command = new() { ServerHandle = serverHandle }; command.ItemHandles.Add(itemHandles); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.AdviseItemBulk, AdviseItemBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.AdviseItemBulk?.Results.ToArray() ?? []; } /// /// Removes multiple items in a single command. /// /// The ServerHandle from register. /// The ItemHandles to remove. /// Cancellation token. /// Per-item subscription results. public async Task> RemoveItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(itemHandles); RemoveItemBulkCommand command = new() { ServerHandle = serverHandle }; command.ItemHandles.Add(itemHandles); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.RemoveItemBulk, RemoveItemBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.RemoveItemBulk?.Results.ToArray() ?? []; } /// /// Unadvises multiple items in a single command. /// /// The ServerHandle from register. /// The ItemHandles to unadvise. /// Cancellation token. /// Per-item subscription results. public async Task> UnAdviseItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(itemHandles); UnAdviseItemBulkCommand command = new() { ServerHandle = serverHandle }; command.ItemHandles.Add(itemHandles); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.UnAdviseItemBulk, UnAdviseItemBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.UnAdviseItemBulk?.Results.ToArray() ?? []; } /// /// Adds and advises multiple items in a single command. /// /// The ServerHandle from register. /// The item tag addresses to add and advise. /// Cancellation token. /// Per-item subscription results. public async Task> SubscribeBulkAsync( int serverHandle, IReadOnlyList tagAddresses, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(tagAddresses); SubscribeBulkCommand command = new() { ServerHandle = serverHandle }; command.TagAddresses.Add(tagAddresses); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.SubscribeBulk, SubscribeBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.SubscribeBulk?.Results.ToArray() ?? []; } /// /// Unadvises and removes multiple items in a single command. /// /// The ServerHandle from register. /// The ItemHandles to unsubscribe. /// Cancellation token. /// Per-item subscription results. public async Task> UnsubscribeBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(itemHandles); UnsubscribeBulkCommand command = new() { ServerHandle = serverHandle }; command.ItemHandles.Add(itemHandles); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.UnsubscribeBulk, UnsubscribeBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.UnsubscribeBulk?.Results.ToArray() ?? []; } /// /// Bulk Write — sequential MXAccess Write per entry on the worker's STA. /// Per-item failures appear as BulkWriteResult entries with /// WasSuccessful = false; the call never throws on per-item errors. /// Protocol-level failures still throw via EnsureProtocolSuccess. /// public async Task> WriteBulkAsync( int serverHandle, IReadOnlyList entries, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(entries); WriteBulkCommand command = new() { ServerHandle = serverHandle }; command.Entries.Add(entries); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.WriteBulk, WriteBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.WriteBulk?.Results.ToArray() ?? []; } /// Bulk Write2 — sequential MXAccess Write2 (timestamped) per entry. public async Task> Write2BulkAsync( int serverHandle, IReadOnlyList entries, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(entries); Write2BulkCommand command = new() { ServerHandle = serverHandle }; command.Entries.Add(entries); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.Write2Bulk, Write2Bulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.Write2Bulk?.Results.ToArray() ?? []; } /// /// Bulk WriteSecured — sequential MXAccess WriteSecured per entry. /// Credential-sensitive values must never reach logs; the client mirrors /// the single-item WriteSecured redaction contract. /// public async Task> WriteSecuredBulkAsync( int serverHandle, IReadOnlyList entries, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(entries); WriteSecuredBulkCommand command = new() { ServerHandle = serverHandle }; command.Entries.Add(entries); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.WriteSecuredBulk, WriteSecuredBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.WriteSecuredBulk?.Results.ToArray() ?? []; } /// Bulk WriteSecured2 — sequential MXAccess WriteSecured2 (timestamped) per entry. public async Task> WriteSecured2BulkAsync( int serverHandle, IReadOnlyList entries, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(entries); WriteSecured2BulkCommand command = new() { ServerHandle = serverHandle }; command.Entries.Add(entries); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.WriteSecured2Bulk, WriteSecured2Bulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.WriteSecured2Bulk?.Results.ToArray() ?? []; } /// /// Bulk Read — snapshot the current value for each requested tag. /// Returns the cached OnDataChange value when the tag is already advised /// (was_cached = true), otherwise the worker takes the full AddItem + /// Advise + wait + UnAdvise + RemoveItem snapshot lifecycle. Per-tag /// failures (timeout, invalid tag) appear as BulkReadResult entries with /// WasSuccessful = false; the call never throws on per-tag errors. /// public async Task> ReadBulkAsync( int serverHandle, IReadOnlyList tagAddresses, TimeSpan timeout, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(tagAddresses); ReadBulkCommand command = new() { ServerHandle = serverHandle, TimeoutMs = timeout <= TimeSpan.Zero ? 0u : (uint)Math.Min(timeout.TotalMilliseconds, uint.MaxValue), }; command.TagAddresses.Add(tagAddresses); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.ReadBulk, ReadBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.ReadBulk?.Results.ToArray() ?? []; } /// /// Writes a value to an item on the MXAccess server. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// The value to write. /// User ID context for the write. /// Cancellation token. public async Task WriteAsync( int serverHandle, int itemHandle, MxValue value, int userId, CancellationToken cancellationToken = default) { MxCommandReply reply = await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); } /// /// Writes a value to an item on the MXAccess server without error checking. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// The value to write. /// User ID context for the write. /// Cancellation token. /// The raw server reply. public Task WriteRawAsync( int serverHandle, int itemHandle, MxValue value, int userId, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(value); return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.Write, Write = new WriteCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, Value = value, UserId = userId, }, }, cancellationToken); } /// /// Writes a value and timestamp to an item on the MXAccess server. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// The value to write. /// The timestamp to write with the value. /// User ID context for the write. /// Cancellation token. public async Task Write2Async( int serverHandle, int itemHandle, MxValue value, MxValue timestampValue, int userId, CancellationToken cancellationToken = default) { MxCommandReply reply = await Write2RawAsync( serverHandle, itemHandle, value, timestampValue, userId, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); } /// /// Writes a value and timestamp to an item on the MXAccess server without error checking. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// The value to write. /// The timestamp to write with the value. /// User ID context for the write. /// Cancellation token. /// The raw server reply. public Task Write2RawAsync( int serverHandle, int itemHandle, MxValue value, MxValue timestampValue, int userId, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(value); ArgumentNullException.ThrowIfNull(timestampValue); return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.Write2, Write2 = new Write2Command { ServerHandle = serverHandle, ItemHandle = itemHandle, Value = value, TimestampValue = timestampValue, UserId = userId, }, }, cancellationToken); } /// /// Invokes an MXAccess command on this session. /// /// The command request. /// Cancellation token. /// The raw server reply. public Task InvokeAsync( MxCommandRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); return _client.InvokeAsync(request, cancellationToken); } /// /// Streams events from the worker for this session, optionally starting after a given sequence number. /// /// The sequence number to stream from. Defaults to 0. /// Cancellation token. /// An async enumerable of events. public IAsyncEnumerable StreamEventsAsync( ulong afterWorkerSequence = 0, CancellationToken cancellationToken = default) { return _client.StreamEventsAsync( new StreamEventsRequest { SessionId = SessionId, AfterWorkerSequence = afterWorkerSequence, }, cancellationToken); } /// /// Closes the session and releases resources. /// public async ValueTask DisposeAsync() { lock (_disposeGate) { if (_closeLockDisposed) { return; } } await CloseAsync().ConfigureAwait(false); // Wait for every concurrent CloseAsync caller to leave the close lock before // disposing it; once _closeReply is set those callers return without awaiting. while (true) { lock (_disposeGate) { if (_activeCloseCount == 0) { _closeLockDisposed = true; break; } } await Task.Yield(); } _closeLock.Dispose(); } private Task InvokeCommandAsync( MxCommand command, CancellationToken cancellationToken) { return _client.InvokeAsync( new MxCommandRequest { SessionId = SessionId, ClientCorrelationId = Guid.NewGuid().ToString("N"), Command = command, }, cancellationToken); } /// /// Builds the exception thrown when a command reply passed protocol and /// MXAccess success checks but is missing the typed handle-bearing payload /// the command contract requires. Surfacing this as a clear error avoids /// silently handing a zero handle to the caller (it would otherwise fall /// through to , which is 0 when the /// reply carries no return value). /// private static MxGatewayException CreateMissingPayloadException( MxCommandReply reply, string expectedPayload) { return new MxGatewayException( $"Gateway reply for command kind={reply.Kind} reported success but is missing " + $"the required '{expectedPayload}' payload; cannot resolve a handle. " + $"session={reply.SessionId}; correlation={reply.CorrelationId}"); } }