using System; using System.Threading; using System.Threading.Tasks; using Serilog; using ZB.MOM.WW.LmxOpcUa.Host.Domain; namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess { public sealed partial class MxAccessClient { public async Task ReadAsync(string fullTagReference, CancellationToken ct = default) { if (_state != ConnectionState.Connected) return Vtq.Bad(Quality.BadNotConnected); await _operationSemaphore.WaitAsync(ct); try { using var scope = _metrics.BeginOperation("Read"); var tcs = new TaskCompletionSource(); // Subscribe, get first value, unsubscribe void OnValue(string addr, Vtq vtq) => tcs.TrySetResult(vtq); var itemHandle = await _staThread.RunAsync(() => { var h = _proxy.AddItem(_connectionHandle, fullTagReference); _proxy.AdviseSupervisory(_connectionHandle, h); return h; }); _handleToAddress[itemHandle] = fullTagReference; _addressToHandle[fullTagReference] = itemHandle; _storedSubscriptions[fullTagReference] = OnValue; try { using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); cts.CancelAfter(TimeSpan.FromSeconds(_config.ReadTimeoutSeconds)); cts.Token.Register(() => tcs.TrySetResult(Vtq.Bad(Quality.BadCommFailure))); return await tcs.Task; } catch { scope.SetSuccess(false); return Vtq.Bad(Quality.BadCommFailure); } finally { _storedSubscriptions.TryRemove(fullTagReference, out _); _handleToAddress.TryRemove(itemHandle, out _); _addressToHandle.TryRemove(fullTagReference, out _); try { await _staThread.RunAsync(() => { _proxy.UnAdviseSupervisory(_connectionHandle, itemHandle); _proxy.RemoveItem(_connectionHandle, itemHandle); }); } catch (Exception ex) { Log.Warning(ex, "Error cleaning up read subscription for {Address}", fullTagReference); } } } finally { _operationSemaphore.Release(); } } public async Task WriteAsync(string fullTagReference, object value, CancellationToken ct = default) { if (_state != ConnectionState.Connected) return false; await _operationSemaphore.WaitAsync(ct); try { using var scope = _metrics.BeginOperation("Write"); var itemHandle = await _staThread.RunAsync(() => { var h = _proxy.AddItem(_connectionHandle, fullTagReference); _proxy.AdviseSupervisory(_connectionHandle, h); return h; }); _handleToAddress[itemHandle] = fullTagReference; _addressToHandle[fullTagReference] = itemHandle; var tcs = new TaskCompletionSource(); _pendingWrites[itemHandle] = tcs; try { await _staThread.RunAsync(() => _proxy.Write(_connectionHandle, itemHandle, value, -1)); using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); cts.CancelAfter(TimeSpan.FromSeconds(_config.WriteTimeoutSeconds)); cts.Token.Register(() => tcs.TrySetResult(true)); // timeout assumes success return await tcs.Task; } catch (Exception ex) { scope.SetSuccess(false); Log.Error(ex, "Write failed for {Address}", fullTagReference); return false; } finally { _pendingWrites.TryRemove(itemHandle, out _); _handleToAddress.TryRemove(itemHandle, out _); _addressToHandle.TryRemove(fullTagReference, out _); try { await _staThread.RunAsync(() => { _proxy.UnAdviseSupervisory(_connectionHandle, itemHandle); _proxy.RemoveItem(_connectionHandle, itemHandle); }); } catch (Exception ex) { Log.Warning(ex, "Error cleaning up write subscription for {Address}", fullTagReference); } } } finally { _operationSemaphore.Release(); } } } }