using System; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using ArchestrA.MxAccess; using ZB.MOM.WW.LmxProxy.Host.Domain; namespace ZB.MOM.WW.LmxProxy.Host.Implementation { /// /// Connection management for MxAccessClient. /// public sealed partial class MxAccessClient { /// /// Asynchronously connects to the MxAccess server. /// /// A cancellation token to observe while waiting for the task to complete. /// A task that represents the asynchronous connect operation. /// Thrown if the client has been disposed. /// Thrown if registration with MxAccess fails. /// Thrown if any other error occurs during connection. public async Task ConnectAsync(CancellationToken ct = default) { // COM operations must run on STA thread, so we use Task.Run here await Task.Run(ConnectInternal, ct); // Recreate stored subscriptions after successful connection await RecreateStoredSubscriptionsAsync(); } /// /// Asynchronously disconnects from the MxAccess server and cleans up resources. /// /// A cancellation token to observe while waiting for the task to complete. /// A task that represents the asynchronous disconnect operation. public async Task DisconnectAsync(CancellationToken ct = default) { // COM operations must run on STA thread, so we use Task.Run here await Task.Run(() => DisconnectInternal(), ct); } /// /// Internal synchronous connection logic. /// private void ConnectInternal() { lock (_lock) { ValidateNotDisposed(); if (IsConnected) { return; } try { Logger.Information("Attempting to connect to MxAccess"); SetConnectionState(ConnectionState.Connecting); InitializeMxAccessConnection(); RegisterWithMxAccess(); } catch (Exception ex) { Logger.Error(ex, "Failed to connect to MxAccess"); Cleanup(); SetConnectionState(ConnectionState.Disconnected, ex.Message); throw; } } } /// /// Validates that the client has not been disposed. /// private void ValidateNotDisposed() { if (_disposed) { throw new ObjectDisposedException(nameof(MxAccessClient)); } } /// /// Initializes the MxAccess COM connection and event handlers. /// private void InitializeMxAccessConnection() { // Create the COM object _lmxProxy = new LMXProxyServer(); // Wire up event handlers _lmxProxy.OnDataChange += OnDataChange; _lmxProxy.OnWriteComplete += OnWriteComplete; _lmxProxy.OperationComplete += OnOperationComplete; } /// /// Registers with the MxAccess server. /// private void RegisterWithMxAccess() { // Register with the server if (_lmxProxy == null) { throw new InvalidOperationException("MxAccess proxy is not initialized"); } _connectionHandle = _lmxProxy.Register("ZB.MOM.WW.LmxProxy.Host"); if (_connectionHandle > 0) { SetConnectionState(ConnectionState.Connected); Logger.Information("Successfully connected to MxAccess with handle {Handle}", _connectionHandle); } else { throw new InvalidOperationException("Failed to register with MxAccess - invalid handle returned"); } } /// /// Internal synchronous disconnection logic. /// private void DisconnectInternal() { lock (_lock) { if (!IsConnected || _lmxProxy == null) { return; } try { Logger.Information("Disconnecting from MxAccess"); SetConnectionState(ConnectionState.Disconnecting); RemoveAllSubscriptions(); UnregisterFromMxAccess(); Cleanup(); SetConnectionState(ConnectionState.Disconnected); Logger.Information("Successfully disconnected from MxAccess"); } catch (Exception ex) { Logger.Error(ex, "Error during disconnect"); Cleanup(); SetConnectionState(ConnectionState.Disconnected, ex.Message); } } } /// /// Removes all active subscriptions. /// private void RemoveAllSubscriptions() { var subscriptionsToRemove = _subscriptions.Values.ToList(); var failedRemovals = new List(); foreach (SubscriptionInfo? sub in subscriptionsToRemove) { if (!TryRemoveSubscription(sub)) { failedRemovals.Add(sub.Address); } } if (failedRemovals.Any()) { Logger.Warning("Failed to cleanly remove {Count} subscriptions: {Addresses}", failedRemovals.Count, string.Join(", ", failedRemovals)); } _subscriptions.Clear(); _subscriptionsByHandle.Clear(); // Note: We intentionally keep _storedSubscriptions to recreate them on reconnect } /// /// Attempts to remove a single subscription. /// private bool TryRemoveSubscription(SubscriptionInfo subscription) { try { if (_lmxProxy == null) { return false; } _lmxProxy.UnAdvise(_connectionHandle, subscription.ItemHandle); _lmxProxy.RemoveItem(_connectionHandle, subscription.ItemHandle); return true; } catch (Exception ex) { Logger.Warning(ex, "Error removing subscription for {Address}", subscription.Address); return false; } } /// /// Unregisters from the MxAccess server. /// private void UnregisterFromMxAccess() { if (_connectionHandle > 0 && _lmxProxy != null) { _lmxProxy.Unregister(_connectionHandle); _connectionHandle = 0; } } /// /// Cleans up resources and releases the COM object. /// Removes event handlers and releases the proxy COM object if present. /// private void Cleanup() { try { if (_lmxProxy != null) { // Remove event handlers _lmxProxy.OnDataChange -= OnDataChange; _lmxProxy.OnWriteComplete -= OnWriteComplete; _lmxProxy.OperationComplete -= OnOperationComplete; // Release COM object int refCount = Marshal.ReleaseComObject(_lmxProxy); if (refCount > 0) { Logger.Warning("COM object reference count after release: {RefCount}", refCount); // Force final release while (refCount > 0) { refCount = Marshal.ReleaseComObject(_lmxProxy); } } _lmxProxy = null; } _connectionHandle = 0; } catch (Exception ex) { Logger.Warning(ex, "Error during cleanup"); } } /// /// Recreates all stored subscriptions after reconnection. /// private async Task RecreateStoredSubscriptionsAsync() { List subscriptionsToRecreate; lock (_lock) { // Create a copy to avoid holding the lock during async operations subscriptionsToRecreate = new List(_storedSubscriptions); } if (subscriptionsToRecreate.Count == 0) { Logger.Debug("No stored subscriptions to recreate"); return; } Logger.Information("Recreating {Count} stored subscription groups after reconnection", subscriptionsToRecreate.Count); foreach (StoredSubscription? storedSub in subscriptionsToRecreate) { try { // Recreate the subscription without storing it again await SubscribeInternalAsync(storedSub.Addresses, storedSub.Callback, false); Logger.Information("Successfully recreated subscription group {GroupId} with {Count} addresses", storedSub.GroupId, storedSub.Addresses.Count); } catch (Exception ex) { Logger.Error(ex, "Failed to recreate subscription group {GroupId}", storedSub.GroupId); } } } } }