using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
///
/// Represents one gateway-backed MXAccess session.
///
public sealed class MxGatewaySession : IAsyncDisposable
{
private readonly MxGatewayClient _client;
private readonly SemaphoreSlim _closeLock = new(1, 1);
private readonly object _disposeGate = new();
private CloseSessionReply? _closeReply;
private int _activeCloseCount;
private bool _closeLockDisposed;
///
/// Initializes a new session backed by the given MXAccess gateway client.
///
/// The gateway client used for commands and events.
/// The server's session creation response.
internal MxGatewaySession(
MxGatewayClient client,
OpenSessionReply openSessionReply)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
OpenSessionReply = openSessionReply ?? throw new ArgumentNullException(nameof(openSessionReply));
}
///
/// The session ID assigned by the gateway.
///
public string SessionId => OpenSessionReply.SessionId;
///
/// The server's session creation response containing metadata.
///
public OpenSessionReply OpenSessionReply { get; }
///
/// Closes the session on the gateway. Idempotent.
///
/// Cancellation token.
/// The server's close-session reply.
public async Task CloseAsync(CancellationToken cancellationToken = default)
{
if (_closeReply is not null)
{
return _closeReply;
}
// Register as an in-flight closer under the dispose gate. DisposeAsync waits for
// _activeCloseCount to drain before disposing the close lock, so the semaphore is
// guaranteed to outlive every WaitAsync started here.
lock (_disposeGate)
{
ObjectDisposedException.ThrowIf(_closeLockDisposed, this);
_activeCloseCount++;
}
try
{
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_closeReply is not null)
{
return _closeReply;
}
_closeReply = await _client.CloseSessionRawAsync(
new CloseSessionRequest { SessionId = SessionId },
cancellationToken)
.ConfigureAwait(false);
return _closeReply;
}
finally
{
_closeLock.Release();
}
}
finally
{
lock (_disposeGate)
{
_activeCloseCount--;
}
}
}
///
/// Registers a client with the MXAccess session, returning a ServerHandle.
///
/// Name to register.
/// Cancellation token.
/// The server handle assigned to the registered client.
public async Task RegisterAsync(
string clientName,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.Register?.ServerHandle
?? throw CreateMissingPayloadException(reply, "register");
}
///
/// Registers a client with the MXAccess session without error checking.
///
/// Name to register.
/// Cancellation token.
/// The raw server reply.
public Task RegisterRawAsync(
string clientName,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(clientName);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = clientName },
},
cancellationToken);
}
///
/// Adds an item to the MXAccess session, returning an ItemHandle.
///
/// The ServerHandle from register.
/// The item tag address.
/// Cancellation token.
/// The item handle assigned to the new item.
public async Task AddItemAsync(
int serverHandle,
string itemDefinition,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await AddItemRawAsync(
serverHandle,
itemDefinition,
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AddItem?.ItemHandle
?? throw CreateMissingPayloadException(reply, "add_item");
}
///
/// Adds an item to the MXAccess session without error checking.
///
/// The ServerHandle from register.
/// The item tag address.
/// Cancellation token.
/// The raw server reply.
public Task AddItemRawAsync(
int serverHandle,
string itemDefinition,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = itemDefinition,
},
},
cancellationToken);
}
///
/// Adds an item with context to the MXAccess session, returning an ItemHandle.
///
/// The ServerHandle from register.
/// The item tag address.
/// Additional context for the item.
/// Cancellation token.
/// The item handle assigned to the new item.
public async Task AddItem2Async(
int serverHandle,
string itemDefinition,
string itemContext,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await AddItem2RawAsync(
serverHandle,
itemDefinition,
itemContext,
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AddItem2?.ItemHandle
?? throw CreateMissingPayloadException(reply, "add_item2");
}
///
/// Adds an item with context to the MXAccess session without error checking.
///
/// The ServerHandle from register.
/// The item tag address.
/// Additional context for the item.
/// Cancellation token.
/// The raw server reply.
public Task AddItem2RawAsync(
int serverHandle,
string itemDefinition,
string itemContext,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AddItem2,
AddItem2 = new AddItem2Command
{
ServerHandle = serverHandle,
ItemDefinition = itemDefinition,
ItemContext = itemContext ?? string.Empty,
},
},
cancellationToken);
}
///
/// Subscribes to events for an item (advises in MXAccess terminology).
///
/// The ServerHandle from register.
/// The ItemHandle from add-item.
/// Cancellation token.
public async Task AdviseAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await AdviseRawAsync(serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
///
/// Subscribes to events for an item without error checking.
///
/// The ServerHandle from register.
/// The ItemHandle from add-item.
/// Cancellation token.
/// The raw server reply.
public Task AdviseRawAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
},
cancellationToken);
}
///
/// Unsubscribes from events for an item (unadvises in MXAccess terminology).
///
/// The ServerHandle from register.
/// The ItemHandle from add-item.
/// Cancellation token.
public async Task UnAdviseAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await UnAdviseRawAsync(serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
///
/// Unsubscribes from events for an item without error checking.
///
/// The ServerHandle from register.
/// The ItemHandle from add-item.
/// Cancellation token.
/// The raw server reply.
public Task UnAdviseRawAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.UnAdvise,
UnAdvise = new UnAdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
},
cancellationToken);
}
///
/// Removes an item from the MXAccess session.
///
/// The ServerHandle from register.
/// The ItemHandle from add-item.
/// Cancellation token.
public async Task RemoveItemAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await RemoveItemRawAsync(serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
///
/// Removes an item from the MXAccess session without error checking.
///
/// The ServerHandle from register.
/// The ItemHandle from add-item.
/// Cancellation token.
/// The raw server reply.
public Task RemoveItemRawAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.RemoveItem,
RemoveItem = new RemoveItemCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
},
cancellationToken);
}
///
/// Adds multiple items to the MXAccess session in a single command.
///
/// The ServerHandle from register.
/// The item tag addresses to add.
/// Cancellation token.
/// Per-item subscription results.
public async Task> AddItemBulkAsync(
int serverHandle,
IReadOnlyList tagAddresses,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(tagAddresses);
AddItemBulkCommand command = new() { ServerHandle = serverHandle };
command.TagAddresses.Add(tagAddresses);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AddItemBulk,
AddItemBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AddItemBulk?.Results.ToArray() ?? [];
}
///
/// Advises multiple items in a single command.
///
/// The ServerHandle from register.
/// The ItemHandles to advise.
/// Cancellation token.
/// Per-item subscription results.
public async Task> AdviseItemBulkAsync(
int serverHandle,
IReadOnlyList itemHandles,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(itemHandles);
AdviseItemBulkCommand command = new() { ServerHandle = serverHandle };
command.ItemHandles.Add(itemHandles);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AdviseItemBulk,
AdviseItemBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AdviseItemBulk?.Results.ToArray() ?? [];
}
///
/// Removes multiple items in a single command.
///
/// The ServerHandle from register.
/// The ItemHandles to remove.
/// Cancellation token.
/// Per-item subscription results.
public async Task> RemoveItemBulkAsync(
int serverHandle,
IReadOnlyList itemHandles,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(itemHandles);
RemoveItemBulkCommand command = new() { ServerHandle = serverHandle };
command.ItemHandles.Add(itemHandles);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.RemoveItemBulk,
RemoveItemBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.RemoveItemBulk?.Results.ToArray() ?? [];
}
///
/// Unadvises multiple items in a single command.
///
/// The ServerHandle from register.
/// The ItemHandles to unadvise.
/// Cancellation token.
/// Per-item subscription results.
public async Task> UnAdviseItemBulkAsync(
int serverHandle,
IReadOnlyList itemHandles,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(itemHandles);
UnAdviseItemBulkCommand command = new() { ServerHandle = serverHandle };
command.ItemHandles.Add(itemHandles);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.UnAdviseItemBulk,
UnAdviseItemBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.UnAdviseItemBulk?.Results.ToArray() ?? [];
}
///
/// Adds and advises multiple items in a single command.
///
/// The ServerHandle from register.
/// The item tag addresses to add and advise.
/// Cancellation token.
/// Per-item subscription results.
public async Task> SubscribeBulkAsync(
int serverHandle,
IReadOnlyList tagAddresses,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(tagAddresses);
SubscribeBulkCommand command = new() { ServerHandle = serverHandle };
command.TagAddresses.Add(tagAddresses);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.SubscribeBulk,
SubscribeBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.SubscribeBulk?.Results.ToArray() ?? [];
}
///
/// Unadvises and removes multiple items in a single command.
///
/// The ServerHandle from register.
/// The ItemHandles to unsubscribe.
/// Cancellation token.
/// Per-item subscription results.
public async Task> UnsubscribeBulkAsync(
int serverHandle,
IReadOnlyList itemHandles,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(itemHandles);
UnsubscribeBulkCommand command = new() { ServerHandle = serverHandle };
command.ItemHandles.Add(itemHandles);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.UnsubscribeBulk,
UnsubscribeBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.UnsubscribeBulk?.Results.ToArray() ?? [];
}
///
/// Bulk Write — sequential MXAccess Write per entry on the worker's STA.
/// Per-item failures appear as BulkWriteResult entries with
/// WasSuccessful = false; the call never throws on per-item errors.
/// Protocol-level failures still throw via EnsureProtocolSuccess.
///
public async Task> WriteBulkAsync(
int serverHandle,
IReadOnlyList entries,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entries);
WriteBulkCommand command = new() { ServerHandle = serverHandle };
command.Entries.Add(entries);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.WriteBulk,
WriteBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.WriteBulk?.Results.ToArray() ?? [];
}
/// Bulk Write2 — sequential MXAccess Write2 (timestamped) per entry.
public async Task> Write2BulkAsync(
int serverHandle,
IReadOnlyList entries,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entries);
Write2BulkCommand command = new() { ServerHandle = serverHandle };
command.Entries.Add(entries);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Write2Bulk,
Write2Bulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.Write2Bulk?.Results.ToArray() ?? [];
}
///
/// Bulk WriteSecured — sequential MXAccess WriteSecured per entry.
/// Credential-sensitive values must never reach logs; the client mirrors
/// the single-item WriteSecured redaction contract.
///
public async Task> WriteSecuredBulkAsync(
int serverHandle,
IReadOnlyList entries,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entries);
WriteSecuredBulkCommand command = new() { ServerHandle = serverHandle };
command.Entries.Add(entries);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.WriteSecuredBulk,
WriteSecuredBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.WriteSecuredBulk?.Results.ToArray() ?? [];
}
/// Bulk WriteSecured2 — sequential MXAccess WriteSecured2 (timestamped) per entry.
public async Task> WriteSecured2BulkAsync(
int serverHandle,
IReadOnlyList entries,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entries);
WriteSecured2BulkCommand command = new() { ServerHandle = serverHandle };
command.Entries.Add(entries);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.WriteSecured2Bulk,
WriteSecured2Bulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.WriteSecured2Bulk?.Results.ToArray() ?? [];
}
///
/// Bulk Read — snapshot the current value for each requested tag.
/// Returns the cached OnDataChange value when the tag is already advised
/// (was_cached = true), otherwise the worker takes the full AddItem +
/// Advise + wait + UnAdvise + RemoveItem snapshot lifecycle. Per-tag
/// failures (timeout, invalid tag) appear as BulkReadResult entries with
/// WasSuccessful = false; the call never throws on per-tag errors.
///
public async Task> ReadBulkAsync(
int serverHandle,
IReadOnlyList tagAddresses,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(tagAddresses);
ReadBulkCommand command = new()
{
ServerHandle = serverHandle,
TimeoutMs = timeout <= TimeSpan.Zero ? 0u : (uint)Math.Min(timeout.TotalMilliseconds, uint.MaxValue),
};
command.TagAddresses.Add(tagAddresses);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.ReadBulk,
ReadBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.ReadBulk?.Results.ToArray() ?? [];
}
///
/// Writes a value to an item on the MXAccess server.
///
/// The ServerHandle from register.
/// The ItemHandle from add-item.
/// The value to write.
/// User ID context for the write.
/// Cancellation token.
public async Task WriteAsync(
int serverHandle,
int itemHandle,
MxValue value,
int userId,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
///
/// Writes a value to an item on the MXAccess server without error checking.
///
/// The ServerHandle from register.
/// The ItemHandle from add-item.
/// The value to write.
/// User ID context for the write.
/// Cancellation token.
/// The raw server reply.
public Task WriteRawAsync(
int serverHandle,
int itemHandle,
MxValue value,
int userId,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(value);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
Value = value,
UserId = userId,
},
},
cancellationToken);
}
///
/// Writes a value and timestamp to an item on the MXAccess server.
///
/// The ServerHandle from register.
/// The ItemHandle from add-item.
/// The value to write.
/// The timestamp to write with the value.
/// User ID context for the write.
/// Cancellation token.
public async Task Write2Async(
int serverHandle,
int itemHandle,
MxValue value,
MxValue timestampValue,
int userId,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await Write2RawAsync(
serverHandle,
itemHandle,
value,
timestampValue,
userId,
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
///
/// Writes a value and timestamp to an item on the MXAccess server without error checking.
///
/// The ServerHandle from register.
/// The ItemHandle from add-item.
/// The value to write.
/// The timestamp to write with the value.
/// User ID context for the write.
/// Cancellation token.
/// The raw server reply.
public Task Write2RawAsync(
int serverHandle,
int itemHandle,
MxValue value,
MxValue timestampValue,
int userId,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(value);
ArgumentNullException.ThrowIfNull(timestampValue);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Write2,
Write2 = new Write2Command
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
Value = value,
TimestampValue = timestampValue,
UserId = userId,
},
},
cancellationToken);
}
///
/// Invokes an MXAccess command on this session.
///
/// The command request.
/// Cancellation token.
/// The raw server reply.
public Task InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
return _client.InvokeAsync(request, cancellationToken);
}
///
/// Streams events from the worker for this session, optionally starting after a given sequence number.
///
/// The sequence number to stream from. Defaults to 0.
/// Cancellation token.
/// An async enumerable of events.
public IAsyncEnumerable StreamEventsAsync(
ulong afterWorkerSequence = 0,
CancellationToken cancellationToken = default)
{
return _client.StreamEventsAsync(
new StreamEventsRequest
{
SessionId = SessionId,
AfterWorkerSequence = afterWorkerSequence,
},
cancellationToken);
}
///
/// Closes the session and releases resources.
///
public async ValueTask DisposeAsync()
{
lock (_disposeGate)
{
if (_closeLockDisposed)
{
return;
}
}
await CloseAsync().ConfigureAwait(false);
// Wait for every concurrent CloseAsync caller to leave the close lock before
// disposing it; once _closeReply is set those callers return without awaiting.
while (true)
{
lock (_disposeGate)
{
if (_activeCloseCount == 0)
{
_closeLockDisposed = true;
break;
}
}
await Task.Yield();
}
_closeLock.Dispose();
}
private Task InvokeCommandAsync(
MxCommand command,
CancellationToken cancellationToken)
{
return _client.InvokeAsync(
new MxCommandRequest
{
SessionId = SessionId,
ClientCorrelationId = Guid.NewGuid().ToString("N"),
Command = command,
},
cancellationToken);
}
///
/// Builds the exception thrown when a command reply passed protocol and
/// MXAccess success checks but is missing the typed handle-bearing payload
/// the command contract requires. Surfacing this as a clear error avoids
/// silently handing a zero handle to the caller (it would otherwise fall
/// through to , which is 0 when the
/// reply carries no return value).
///
private static MxGatewayException CreateMissingPayloadException(
MxCommandReply reply,
string expectedPayload)
{
return new MxGatewayException(
$"Gateway reply for command kind={reply.Kind} reported success but is missing "
+ $"the required '{expectedPayload}' payload; cannot resolve a handle. "
+ $"session={reply.SessionId}; correlation={reply.CorrelationId}");
}
}