using System; using System.Threading; using System.Threading.Tasks; using ZB.MOM.WW.OtOpcUa.Host.Domain; namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess { public sealed partial class MxAccessClient { /// /// Opens the MXAccess runtime connection, replays stored subscriptions, and starts the optional probe subscription. /// /// A token that cancels the connection attempt. public async Task ConnectAsync(CancellationToken ct = default) { if (_state == ConnectionState.Connected) return; SetState(ConnectionState.Connecting); try { _connectionHandle = await _staThread.RunAsync(() => { AttachProxyEvents(); return _proxy.Register(_config.ClientName); }); Log.Information("MxAccess registered with handle {Handle}", _connectionHandle); SetState(ConnectionState.Connected); // Replay stored subscriptions await ReplayStoredSubscriptionsAsync(); // Start probe if configured if (!string.IsNullOrWhiteSpace(_config.ProbeTag)) { _probeTag = _config.ProbeTag; _lastProbeValueTime = DateTime.UtcNow; await SubscribeInternalAsync(_probeTag!); Log.Information("Probe tag subscribed: {ProbeTag}", _probeTag); } } catch (Exception ex) { try { await _staThread.RunAsync(DetachProxyEvents); } catch (Exception cleanupEx) { Log.Warning(cleanupEx, "Failed to detach proxy events after connection failure"); } Log.Error(ex, "MxAccess connection failed"); SetState(ConnectionState.Error, ex.Message); throw; } } /// /// Disconnects from the runtime and cleans up active handles, callbacks, and pending operations. /// public async Task DisconnectAsync() { if (_state == ConnectionState.Disconnected) return; SetState(ConnectionState.Disconnecting); try { await _staThread.RunAsync(() => { // UnAdvise + RemoveItem for all active subscriptions foreach (var kvp in _addressToHandle) try { _proxy.UnAdviseSupervisory(_connectionHandle, kvp.Value); _proxy.RemoveItem(_connectionHandle, kvp.Value); } catch (Exception ex) { Log.Warning(ex, "Error cleaning up subscription for {Address}", kvp.Key); } // Unwire events before unregister DetachProxyEvents(); // Unregister try { _proxy.Unregister(_connectionHandle); } catch (Exception ex) { Log.Warning(ex, "Error during Unregister"); } }); _handleToAddress.Clear(); _addressToHandle.Clear(); _pendingReadsByAddress.Clear(); _pendingWrites.Clear(); } catch (Exception ex) { Log.Warning(ex, "Error during disconnect"); } finally { SetState(ConnectionState.Disconnected); } } /// /// Attempts to recover from a runtime fault by disconnecting and reconnecting the client. /// public async Task ReconnectAsync() { SetState(ConnectionState.Reconnecting); Interlocked.Increment(ref _reconnectCount); Log.Information("MxAccess reconnect attempt #{Count}", _reconnectCount); try { await DisconnectAsync(); await ConnectAsync(); } catch (Exception ex) { Log.Error(ex, "Reconnect failed"); SetState(ConnectionState.Error, ex.Message); } } private void AttachProxyEvents() { if (_proxyEventsAttached) return; _proxy.OnDataChange += HandleOnDataChange; _proxy.OnWriteComplete += HandleOnWriteComplete; _proxyEventsAttached = true; } private void DetachProxyEvents() { if (!_proxyEventsAttached) return; _proxy.OnDataChange -= HandleOnDataChange; _proxy.OnWriteComplete -= HandleOnWriteComplete; _proxyEventsAttached = false; } } }