diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs index e2660eaa..8435d1d6 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs @@ -32,6 +32,25 @@ public sealed class RealMxGatewayClient : IMxGatewayClient private readonly ConcurrentDictionary _tagToHandle = new(); private readonly ConcurrentDictionary _handleToTag = new(); + // Item handles already advised in SUPERVISORY mode (advise-once tracking). + // MXAccess only accepts a write on an item that holds a supervisory advise; a + // plain Advise is not sufficient and the COM Write blocks until the gateway + // command timeout. When the connection has no write-user context + // (WriteUserId == 0) we therefore advise supervisory by DEFAULT — for both the + // monitoring subscription and any write — so the one connection can read and + // write. A supervisory advise still delivers OnDataChange (monitoring is + // unaffected), and the worker's UnAdvise tears down either advice kind. The + // Lazy per handle guarantees the advise runs at most once even under a + // concurrent first-time subscribe+write on the same new handle: every racing + // caller awaits the same Task, so none reaches WriteBulk before the advise has + // completed. A faulted advise is evicted so the next write retries. + private readonly ConcurrentDictionary> _supervisoryAdvised = new(); + + // With no configured MXAccess write-user this connection behaves as a + // supervisory client: every advise defaults to AdviseSupervisory so writes are + // accepted. A configured write-user keeps the prior plain-advise behaviour. + private bool UseSupervisoryAdvise => _writeUserId == 0; + /// Initializes a new instance of . /// Logger factory shared with the gateway client. public RealMxGatewayClient(ILoggerFactory? loggerFactory) @@ -74,7 +93,12 @@ public sealed class RealMxGatewayClient : IMxGatewayClient public async Task SubscribeAsync(string tagPath, CancellationToken ct = default) { var handle = await GetOrAddItemHandleAsync(tagPath, ct).ConfigureAwait(false); - await _session!.AdviseAsync(_serverHandle, handle, ct).ConfigureAwait(false); + // With no write-user context, advise supervisory by default so the same + // subscription also makes the item writable; otherwise plain monitor-advise. + if (UseSupervisoryAdvise) + await EnsureSupervisoryAdvisedAsync(handle, ct).ConfigureAwait(false); + else + await _session!.AdviseAsync(_serverHandle, handle, ct).ConfigureAwait(false); return handle.ToString(CultureInfo.InvariantCulture); } @@ -87,6 +111,9 @@ public sealed class RealMxGatewayClient : IMxGatewayClient await _session!.UnAdviseAsync(_serverHandle, handle, ct).ConfigureAwait(false); await _session.RemoveItemAsync(_serverHandle, handle, ct).ConfigureAwait(false); + // RemoveItem releases the item and all its advices (plain + supervisory), + // so forget any supervisory-advise state for this handle. + _supervisoryAdvised.TryRemove(handle, out _); if (_handleToTag.TryRemove(handle, out var tag)) _tagToHandle.TryRemove(tag, out _); } @@ -117,6 +144,12 @@ public sealed class RealMxGatewayClient : IMxGatewayClient foreach (var (tag, value) in writes) { var handle = await GetOrAddItemHandleAsync(tag, ct).ConfigureAwait(false); + // MXAccess requires a supervisory advise on the item before it will + // accept a write; without it the worker's synchronous COM Write blocks. + // With no write-user context we advise supervisory by default (a + // supervisory subscription may already cover this handle — advise-once). + if (UseSupervisoryAdvise) + await EnsureSupervisoryAdvisedAsync(handle, ct).ConfigureAwait(false); entries.Add(new WriteBulkEntry { ItemHandle = handle, @@ -323,6 +356,58 @@ public sealed class RealMxGatewayClient : IMxGatewayClient return handle; } + /// + /// Ensures the item is advised in MXAccess SUPERVISORY mode (advise-once). A + /// supervisory advise is what lets the gateway accept a write and it still + /// delivers OnDataChange, so it doubles as the monitoring subscription when this + /// connection has no write-user context. The client package exposes only a plain + /// AdviseAsync, so the supervisory advise is issued as a raw + /// command (mirroring how the session + /// builds every other command). The result is remembered per handle; a failure is + /// not cached so the next attempt retries rather than wedging the item permanently. + /// + private async Task EnsureSupervisoryAdvisedAsync(int handle, CancellationToken ct) + { + // GetOrAdd + Lazy runs the advise at most once per handle; concurrent + // callers await the same Task and so never reach WriteBulk before it + // completes (default Lazy is thread-safe, factory runs once). + var advise = _supervisoryAdvised.GetOrAdd( + handle, + h => new Lazy(() => AdviseSupervisoryCoreAsync(h, ct))); + try + { + await advise.Value.ConfigureAwait(false); + } + catch + { + // Evict the faulted entry (only if it's still the one we awaited) so the + // next write retries rather than caching a transient advise failure. + _supervisoryAdvised.TryRemove(new KeyValuePair>(handle, advise)); + throw; + } + } + + private async Task AdviseSupervisoryCoreAsync(int handle, CancellationToken ct) + { + var reply = await _session!.InvokeAsync( + new MxCommandRequest + { + SessionId = _session.SessionId, + ClientCorrelationId = Guid.NewGuid().ToString("N"), + Command = new MxCommand + { + Kind = MxCommandKind.AdviseSupervisory, + AdviseSupervisory = new AdviseSupervisoryCommand + { + ServerHandle = _serverHandle, + ItemHandle = handle, + }, + }, + }, + ct).ConfigureAwait(false); + reply.EnsureProtocolSuccess().EnsureMxAccessSuccess(); + } + /// /// Maps MXAccess quality. A failing status proxy is authoritative bad; otherwise /// the OPC-style quality byte: ≥192 Good, ≥64 Uncertain, else Bad.