fix(dcl): default MxGateway advises to supervisory when WriteUserId==0
Writes through the MxGateway data connection (e.g. the Ipsen MoveIn flow writing MES-receiver attributes) hung ~30s and changed nothing, while reads of the same attributes worked. Root cause: MXAccess only accepts a write on an item that holds a SUPERVISORY advise; the write path did AddItem + WriteBulk with no advise and the monitoring subscription used a plain Advise, so the worker's synchronous COM Write blocked until the gateway command timeout. (Plain, non-secured writes need no user/login.) Verified live: with a supervisory advise the write returns ok in ~22ms; without it it does not. When the connection has no MXAccess write-user context (WriteUserId == 0) it now behaves as a supervisory client: every advise defaults to AdviseSupervisory — both the monitoring subscription (SubscribeAsync) and the write path — so one connection can read and write. A supervisory advise still delivers OnDataChange (the worker treats either advice kind as sufficient for updates) so monitoring is unaffected, and the worker's UnAdvise tears down either kind, so unsubscribe is unchanged. AdviseSupervisory is issued as a raw MxCommandKind.AdviseSupervisory via the session's Invoke (the client package exposes only plain Advise). The advise runs at most once per handle via a Lazy<Task> so a concurrent first-time subscribe+write on the same new handle both await the same advise (neither writes before it completes); a faulted advise is evicted so the next write retries. Dropped on unsubscribe. A configured non-zero WriteUserId keeps the prior plain-advise behaviour.
This commit is contained in:
@@ -32,6 +32,25 @@ public sealed class RealMxGatewayClient : IMxGatewayClient
|
||||
private readonly ConcurrentDictionary<string, int> _tagToHandle = new();
|
||||
private readonly ConcurrentDictionary<int, string> _handleToTag = new();
|
||||
|
||||
// Item handles already advised in SUPERVISORY mode (advise-once tracking).
|
||||
// MXAccess only accepts a write on an item that holds a supervisory advise; a
|
||||
// plain Advise is not sufficient and the COM Write blocks until the gateway
|
||||
// command timeout. When the connection has no write-user context
|
||||
// (WriteUserId == 0) we therefore advise supervisory by DEFAULT — for both the
|
||||
// monitoring subscription and any write — so the one connection can read and
|
||||
// write. A supervisory advise still delivers OnDataChange (monitoring is
|
||||
// unaffected), and the worker's UnAdvise tears down either advice kind. The
|
||||
// Lazy<Task> per handle guarantees the advise runs at most once even under a
|
||||
// concurrent first-time subscribe+write on the same new handle: every racing
|
||||
// caller awaits the same Task, so none reaches WriteBulk before the advise has
|
||||
// completed. A faulted advise is evicted so the next write retries.
|
||||
private readonly ConcurrentDictionary<int, Lazy<Task>> _supervisoryAdvised = new();
|
||||
|
||||
// With no configured MXAccess write-user this connection behaves as a
|
||||
// supervisory client: every advise defaults to AdviseSupervisory so writes are
|
||||
// accepted. A configured write-user keeps the prior plain-advise behaviour.
|
||||
private bool UseSupervisoryAdvise => _writeUserId == 0;
|
||||
|
||||
/// <summary>Initializes a new instance of <see cref="RealMxGatewayClient"/>.</summary>
|
||||
/// <param name="loggerFactory">Logger factory shared with the gateway client.</param>
|
||||
public RealMxGatewayClient(ILoggerFactory? loggerFactory)
|
||||
@@ -74,7 +93,12 @@ public sealed class RealMxGatewayClient : IMxGatewayClient
|
||||
public async Task<string> SubscribeAsync(string tagPath, CancellationToken ct = default)
|
||||
{
|
||||
var handle = await GetOrAddItemHandleAsync(tagPath, ct).ConfigureAwait(false);
|
||||
await _session!.AdviseAsync(_serverHandle, handle, ct).ConfigureAwait(false);
|
||||
// With no write-user context, advise supervisory by default so the same
|
||||
// subscription also makes the item writable; otherwise plain monitor-advise.
|
||||
if (UseSupervisoryAdvise)
|
||||
await EnsureSupervisoryAdvisedAsync(handle, ct).ConfigureAwait(false);
|
||||
else
|
||||
await _session!.AdviseAsync(_serverHandle, handle, ct).ConfigureAwait(false);
|
||||
return handle.ToString(CultureInfo.InvariantCulture);
|
||||
}
|
||||
|
||||
@@ -87,6 +111,9 @@ public sealed class RealMxGatewayClient : IMxGatewayClient
|
||||
await _session!.UnAdviseAsync(_serverHandle, handle, ct).ConfigureAwait(false);
|
||||
await _session.RemoveItemAsync(_serverHandle, handle, ct).ConfigureAwait(false);
|
||||
|
||||
// RemoveItem releases the item and all its advices (plain + supervisory),
|
||||
// so forget any supervisory-advise state for this handle.
|
||||
_supervisoryAdvised.TryRemove(handle, out _);
|
||||
if (_handleToTag.TryRemove(handle, out var tag))
|
||||
_tagToHandle.TryRemove(tag, out _);
|
||||
}
|
||||
@@ -117,6 +144,12 @@ public sealed class RealMxGatewayClient : IMxGatewayClient
|
||||
foreach (var (tag, value) in writes)
|
||||
{
|
||||
var handle = await GetOrAddItemHandleAsync(tag, ct).ConfigureAwait(false);
|
||||
// MXAccess requires a supervisory advise on the item before it will
|
||||
// accept a write; without it the worker's synchronous COM Write blocks.
|
||||
// With no write-user context we advise supervisory by default (a
|
||||
// supervisory subscription may already cover this handle — advise-once).
|
||||
if (UseSupervisoryAdvise)
|
||||
await EnsureSupervisoryAdvisedAsync(handle, ct).ConfigureAwait(false);
|
||||
entries.Add(new WriteBulkEntry
|
||||
{
|
||||
ItemHandle = handle,
|
||||
@@ -323,6 +356,58 @@ public sealed class RealMxGatewayClient : IMxGatewayClient
|
||||
return handle;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Ensures the item is advised in MXAccess SUPERVISORY mode (advise-once). A
|
||||
/// supervisory advise is what lets the gateway accept a write and it still
|
||||
/// delivers OnDataChange, so it doubles as the monitoring subscription when this
|
||||
/// connection has no write-user context. The client package exposes only a plain
|
||||
/// <c>AdviseAsync</c>, so the supervisory advise is issued as a raw
|
||||
/// <see cref="MxCommandKind.AdviseSupervisory"/> command (mirroring how the session
|
||||
/// builds every other command). The result is remembered per handle; a failure is
|
||||
/// not cached so the next attempt retries rather than wedging the item permanently.
|
||||
/// </summary>
|
||||
private async Task EnsureSupervisoryAdvisedAsync(int handle, CancellationToken ct)
|
||||
{
|
||||
// GetOrAdd + Lazy runs the advise at most once per handle; concurrent
|
||||
// callers await the same Task and so never reach WriteBulk before it
|
||||
// completes (default Lazy is thread-safe, factory runs once).
|
||||
var advise = _supervisoryAdvised.GetOrAdd(
|
||||
handle,
|
||||
h => new Lazy<Task>(() => AdviseSupervisoryCoreAsync(h, ct)));
|
||||
try
|
||||
{
|
||||
await advise.Value.ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Evict the faulted entry (only if it's still the one we awaited) so the
|
||||
// next write retries rather than caching a transient advise failure.
|
||||
_supervisoryAdvised.TryRemove(new KeyValuePair<int, Lazy<Task>>(handle, advise));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task AdviseSupervisoryCoreAsync(int handle, CancellationToken ct)
|
||||
{
|
||||
var reply = await _session!.InvokeAsync(
|
||||
new MxCommandRequest
|
||||
{
|
||||
SessionId = _session.SessionId,
|
||||
ClientCorrelationId = Guid.NewGuid().ToString("N"),
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AdviseSupervisory,
|
||||
AdviseSupervisory = new AdviseSupervisoryCommand
|
||||
{
|
||||
ServerHandle = _serverHandle,
|
||||
ItemHandle = handle,
|
||||
},
|
||||
},
|
||||
},
|
||||
ct).ConfigureAwait(false);
|
||||
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Maps MXAccess quality. A failing status proxy is authoritative bad; otherwise
|
||||
/// the OPC-style quality byte: ≥192 Good, ≥64 Uncertain, else Bad.
|
||||
|
||||
Reference in New Issue
Block a user