using MxGateway.Contracts.Proto; namespace MxGateway.Client; /// /// Represents one gateway-backed MXAccess session. /// public sealed class MxGatewaySession : IAsyncDisposable { private readonly MxGatewayClient _client; private readonly SemaphoreSlim _closeLock = new(1, 1); private CloseSessionReply? _closeReply; /// /// Initializes a new session backed by the given MXAccess gateway client. /// /// The gateway client used for commands and events. /// The server's session creation response. internal MxGatewaySession( MxGatewayClient client, OpenSessionReply openSessionReply) { _client = client ?? throw new ArgumentNullException(nameof(client)); OpenSessionReply = openSessionReply ?? throw new ArgumentNullException(nameof(openSessionReply)); } /// /// The session ID assigned by the gateway. /// public string SessionId => OpenSessionReply.SessionId; /// /// The server's session creation response containing metadata. /// public OpenSessionReply OpenSessionReply { get; } /// /// Closes the session on the gateway. Idempotent. /// /// Cancellation token. /// The server's close-session reply. public async Task CloseAsync(CancellationToken cancellationToken = default) { if (_closeReply is not null) { return _closeReply; } await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_closeReply is not null) { return _closeReply; } _closeReply = await _client.CloseSessionRawAsync( new CloseSessionRequest { SessionId = SessionId }, cancellationToken) .ConfigureAwait(false); return _closeReply; } finally { _closeLock.Release(); } } /// /// Registers a client with the MXAccess session, returning a ServerHandle. /// /// Name to register. /// Cancellation token. /// The server handle assigned to the registered client. public async Task RegisterAsync( string clientName, CancellationToken cancellationToken = default) { MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value; } /// /// Registers a client with the MXAccess session without error checking. /// /// Name to register. /// Cancellation token. /// The raw server reply. public Task RegisterRawAsync( string clientName, CancellationToken cancellationToken = default) { ArgumentException.ThrowIfNullOrWhiteSpace(clientName); return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.Register, Register = new RegisterCommand { ClientName = clientName }, }, cancellationToken); } /// /// Adds an item to the MXAccess session, returning an ItemHandle. /// /// The ServerHandle from register. /// The item tag address. /// Cancellation token. /// The item handle assigned to the new item. public async Task AddItemAsync( int serverHandle, string itemDefinition, CancellationToken cancellationToken = default) { MxCommandReply reply = await AddItemRawAsync( serverHandle, itemDefinition, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value; } /// /// Adds an item to the MXAccess session without error checking. /// /// The ServerHandle from register. /// The item tag address. /// Cancellation token. /// The raw server reply. public Task AddItemRawAsync( int serverHandle, string itemDefinition, CancellationToken cancellationToken = default) { ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition); return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = serverHandle, ItemDefinition = itemDefinition, }, }, cancellationToken); } /// /// Adds an item with context to the MXAccess session, returning an ItemHandle. /// /// The ServerHandle from register. /// The item tag address. /// Additional context for the item. /// Cancellation token. /// The item handle assigned to the new item. public async Task AddItem2Async( int serverHandle, string itemDefinition, string itemContext, CancellationToken cancellationToken = default) { MxCommandReply reply = await AddItem2RawAsync( serverHandle, itemDefinition, itemContext, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.AddItem2?.ItemHandle ?? reply.ReturnValue.Int32Value; } /// /// Adds an item with context to the MXAccess session without error checking. /// /// The ServerHandle from register. /// The item tag address. /// Additional context for the item. /// Cancellation token. /// The raw server reply. public Task AddItem2RawAsync( int serverHandle, string itemDefinition, string itemContext, CancellationToken cancellationToken = default) { ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition); return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.AddItem2, AddItem2 = new AddItem2Command { ServerHandle = serverHandle, ItemDefinition = itemDefinition, ItemContext = itemContext ?? string.Empty, }, }, cancellationToken); } /// /// Subscribes to events for an item (advises in MXAccess terminology). /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. public async Task AdviseAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { MxCommandReply reply = await AdviseRawAsync(serverHandle, itemHandle, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); } /// /// Subscribes to events for an item without error checking. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. /// The raw server reply. public Task AdviseRawAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.Advise, Advise = new AdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, cancellationToken); } /// /// Unsubscribes from events for an item (unadvises in MXAccess terminology). /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. public async Task UnAdviseAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { MxCommandReply reply = await UnAdviseRawAsync(serverHandle, itemHandle, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); } /// /// Unsubscribes from events for an item without error checking. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. /// The raw server reply. public Task UnAdviseRawAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.UnAdvise, UnAdvise = new UnAdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, cancellationToken); } /// /// Removes an item from the MXAccess session. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. public async Task RemoveItemAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { MxCommandReply reply = await RemoveItemRawAsync(serverHandle, itemHandle, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); } /// /// Removes an item from the MXAccess session without error checking. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// Cancellation token. /// The raw server reply. public Task RemoveItemRawAsync( int serverHandle, int itemHandle, CancellationToken cancellationToken = default) { return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.RemoveItem, RemoveItem = new RemoveItemCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, cancellationToken); } /// /// Adds multiple items to the MXAccess session in a single command. /// /// The ServerHandle from register. /// The item tag addresses to add. /// Cancellation token. /// Per-item subscription results. public async Task> AddItemBulkAsync( int serverHandle, IReadOnlyList tagAddresses, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(tagAddresses); AddItemBulkCommand command = new() { ServerHandle = serverHandle }; command.TagAddresses.Add(tagAddresses); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.AddItemBulk, AddItemBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.AddItemBulk?.Results.ToArray() ?? []; } /// /// Advises multiple items in a single command. /// /// The ServerHandle from register. /// The ItemHandles to advise. /// Cancellation token. /// Per-item subscription results. public async Task> AdviseItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(itemHandles); AdviseItemBulkCommand command = new() { ServerHandle = serverHandle }; command.ItemHandles.Add(itemHandles); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.AdviseItemBulk, AdviseItemBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.AdviseItemBulk?.Results.ToArray() ?? []; } /// /// Removes multiple items in a single command. /// /// The ServerHandle from register. /// The ItemHandles to remove. /// Cancellation token. /// Per-item subscription results. public async Task> RemoveItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(itemHandles); RemoveItemBulkCommand command = new() { ServerHandle = serverHandle }; command.ItemHandles.Add(itemHandles); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.RemoveItemBulk, RemoveItemBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.RemoveItemBulk?.Results.ToArray() ?? []; } /// /// Unadvises multiple items in a single command. /// /// The ServerHandle from register. /// The ItemHandles to unadvise. /// Cancellation token. /// Per-item subscription results. public async Task> UnAdviseItemBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(itemHandles); UnAdviseItemBulkCommand command = new() { ServerHandle = serverHandle }; command.ItemHandles.Add(itemHandles); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.UnAdviseItemBulk, UnAdviseItemBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.UnAdviseItemBulk?.Results.ToArray() ?? []; } /// /// Adds and advises multiple items in a single command. /// /// The ServerHandle from register. /// The item tag addresses to add and advise. /// Cancellation token. /// Per-item subscription results. public async Task> SubscribeBulkAsync( int serverHandle, IReadOnlyList tagAddresses, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(tagAddresses); SubscribeBulkCommand command = new() { ServerHandle = serverHandle }; command.TagAddresses.Add(tagAddresses); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.SubscribeBulk, SubscribeBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.SubscribeBulk?.Results.ToArray() ?? []; } /// /// Unadvises and removes multiple items in a single command. /// /// The ServerHandle from register. /// The ItemHandles to unsubscribe. /// Cancellation token. /// Per-item subscription results. public async Task> UnsubscribeBulkAsync( int serverHandle, IReadOnlyList itemHandles, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(itemHandles); UnsubscribeBulkCommand command = new() { ServerHandle = serverHandle }; command.ItemHandles.Add(itemHandles); MxCommandReply reply = await InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.UnsubscribeBulk, UnsubscribeBulk = command, }, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); return reply.UnsubscribeBulk?.Results.ToArray() ?? []; } /// /// Writes a value to an item on the MXAccess server. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// The value to write. /// User ID context for the write. /// Cancellation token. public async Task WriteAsync( int serverHandle, int itemHandle, MxValue value, int userId, CancellationToken cancellationToken = default) { MxCommandReply reply = await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); } /// /// Writes a value to an item on the MXAccess server without error checking. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// The value to write. /// User ID context for the write. /// Cancellation token. /// The raw server reply. public Task WriteRawAsync( int serverHandle, int itemHandle, MxValue value, int userId, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(value); return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.Write, Write = new WriteCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, Value = value, UserId = userId, }, }, cancellationToken); } /// /// Writes a value and timestamp to an item on the MXAccess server. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// The value to write. /// The timestamp to write with the value. /// User ID context for the write. /// Cancellation token. public async Task Write2Async( int serverHandle, int itemHandle, MxValue value, MxValue timestampValue, int userId, CancellationToken cancellationToken = default) { MxCommandReply reply = await Write2RawAsync( serverHandle, itemHandle, value, timestampValue, userId, cancellationToken) .ConfigureAwait(false); reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); } /// /// Writes a value and timestamp to an item on the MXAccess server without error checking. /// /// The ServerHandle from register. /// The ItemHandle from add-item. /// The value to write. /// The timestamp to write with the value. /// User ID context for the write. /// Cancellation token. /// The raw server reply. public Task Write2RawAsync( int serverHandle, int itemHandle, MxValue value, MxValue timestampValue, int userId, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(value); ArgumentNullException.ThrowIfNull(timestampValue); return InvokeCommandAsync( new MxCommand { Kind = MxCommandKind.Write2, Write2 = new Write2Command { ServerHandle = serverHandle, ItemHandle = itemHandle, Value = value, TimestampValue = timestampValue, UserId = userId, }, }, cancellationToken); } /// /// Invokes an MXAccess command on this session. /// /// The command request. /// Cancellation token. /// The raw server reply. public Task InvokeAsync( MxCommandRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); return _client.InvokeAsync(request, cancellationToken); } /// /// Streams events from the worker for this session, optionally starting after a given sequence number. /// /// The sequence number to stream from. Defaults to 0. /// Cancellation token. /// An async enumerable of events. public IAsyncEnumerable StreamEventsAsync( ulong afterWorkerSequence = 0, CancellationToken cancellationToken = default) { return _client.StreamEventsAsync( new StreamEventsRequest { SessionId = SessionId, AfterWorkerSequence = afterWorkerSequence, }, cancellationToken); } /// /// Closes the session and releases resources. /// public async ValueTask DisposeAsync() { await CloseAsync().ConfigureAwait(false); _closeLock.Dispose(); } private Task InvokeCommandAsync( MxCommand command, CancellationToken cancellationToken) { return _client.InvokeAsync( new MxCommandRequest { SessionId = SessionId, ClientCorrelationId = Guid.NewGuid().ToString("N"), Command = command, }, cancellationToken); } }