From b6408726bcdfe24b345b33f39bf6d3a2c0bfab59 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Mar 2026 18:00:24 -0400 Subject: [PATCH] feat(lmxproxy): add STA thread with message pump for MxAccess COM callbacks Co-Authored-By: Claude Opus 4.6 (1M context) --- .../MxAccess/MxAccessClient.Connection.cs | 12 +- .../MxAccess/MxAccessClient.EventHandlers.cs | 2 + .../MxAccess/MxAccessClient.ReadWrite.cs | 6 +- .../MxAccess/MxAccessClient.Subscription.cs | 10 +- .../MxAccess/MxAccessClient.cs | 14 +- .../MxAccess/StaComThread.cs | 133 ++++++++++++++++++ .../ZB.MOM.WW.LmxProxy.Host.csproj | 1 + 7 files changed, 161 insertions(+), 17 deletions(-) create mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaComThread.cs diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs index 6d5af02..144b97f 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs @@ -12,7 +12,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess public sealed partial class MxAccessClient { /// - /// Connects to MxAccess via Task.Run (thread pool). + /// Connects to MxAccess on the dedicated STA thread. /// public async Task ConnectAsync(CancellationToken ct = default) { @@ -23,7 +23,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess try { - await Task.Run(() => ConnectInternal(), ct); + await _staThread.RunAsync(() => ConnectInternal()); lock (_lock) { @@ -46,7 +46,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess } /// - /// Disconnects from MxAccess via Task.Run (thread pool). + /// Disconnects from MxAccess on the dedicated STA thread. /// public async Task DisconnectAsync(CancellationToken ct = default) { @@ -56,7 +56,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess try { - await Task.Run(() => DisconnectInternal()); + await _staThread.RunAsync(() => DisconnectInternal()); SetState(ConnectionState.Disconnected); Log.Information("Disconnected from MxAccess"); @@ -346,13 +346,13 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess } /// - /// Cleans up COM objects via Task.Run after a failed connection. + /// Cleans up COM objects on the dedicated STA thread after a failed connection. /// private async Task CleanupComObjectsAsync() { try { - await Task.Run(() => + await _staThread.RunAsync(() => { lock (_lock) { diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs index 198409d..2cba331 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using ArchestrA.MxAccess; using Serilog; using ZB.MOM.WW.LmxProxy.Host.Domain; @@ -27,6 +28,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess { try { + Log.Information("OnDataChange FIRED: handle={Handle}", phItemHandle); var quality = MapQuality(pwItemQuality); var timestamp = ConvertTimestamp(pftItemTimeStamp); diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs index 5cfb06e..474d970 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs @@ -183,14 +183,14 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess } /// - /// Internal write implementation using Task.Run for COM calls. + /// Internal write implementation dispatched on the STA thread. /// MxAccess completes supervisory writes synchronously — the Write() call /// succeeding (not throwing) confirms the write. The OnWriteComplete callback /// is kept wired for diagnostic logging but is not awaited. /// private async Task WriteInternalAsync(string address, object value, CancellationToken ct) { - await Task.Run(() => + await _staThread.RunAsync(() => { lock (_lock) { @@ -243,7 +243,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess } } } - }, ct); + }); } /// diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs index 0b9935e..741fff9 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs @@ -13,7 +13,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess /// /// Subscribes to value changes for the specified addresses. /// Stores subscription state for reconnect replay. - /// COM calls dispatched via Task.Run. + /// COM calls dispatched on the dedicated STA thread. /// public async Task SubscribeAsync( IEnumerable addresses, @@ -25,7 +25,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess var addressList = addresses.ToList(); - await Task.Run(() => + await _staThread.RunAsync(() => { lock (_lock) { @@ -40,7 +40,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess _storedSubscriptions[address] = callback; } } - }, ct); + }); Log.Information("Subscribed to {Count} tags", addressList.Count); @@ -63,7 +63,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess { var addressList = addresses.ToList(); - await Task.Run(() => + await _staThread.RunAsync(() => { lock (_lock) { @@ -93,7 +93,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess Log.Information("Recreating {Count} stored subscriptions after reconnect", subscriptions.Count); - await Task.Run(() => + await _staThread.RunAsync(() => { lock (_lock) { diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs index d424c1a..ffc57db 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs @@ -10,8 +10,9 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess { /// /// Wraps the ArchestrA MXAccess COM API. All COM operations - /// execute via Task.Run (thread pool / MTA), relying on COM - /// marshaling to handle cross-apartment calls. + /// execute on a dedicated STA thread with a Windows message pump + /// so that COM callbacks (OnDataChange, OnWriteComplete) are + /// delivered correctly. /// public sealed partial class MxAccessClient : IScadaClient { @@ -29,7 +30,10 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess private readonly SemaphoreSlim _readSemaphore; private readonly SemaphoreSlim _writeSemaphore; - // COM objects + // STA thread for COM interop + private readonly StaComThread _staThread; + + // COM objects — only accessed on the STA thread private LMXProxyServer? _lmxProxy; private int _connectionHandle; @@ -93,6 +97,9 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess _readSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations); _writeSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations); + + _staThread = new StaComThread(); + _staThread.Start(); } public bool IsConnected @@ -152,6 +159,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess _readSemaphore.Dispose(); _writeSemaphore.Dispose(); _reconnectCts?.Dispose(); + _staThread.Dispose(); } } } diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaComThread.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaComThread.cs new file mode 100644 index 0000000..9a35afb --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaComThread.cs @@ -0,0 +1,133 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Windows.Forms; +using Serilog; + +namespace ZB.MOM.WW.LmxProxy.Host.MxAccess +{ + /// + /// Dedicated STA thread with a Windows message pump for COM interop. + /// All MxAccess COM objects must be created and called on this thread + /// so that COM callbacks (OnDataChange, OnWriteComplete) are delivered + /// via the message loop. + /// + public sealed class StaComThread : IDisposable + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly Thread _thread; + private readonly TaskCompletionSource _ready = new TaskCompletionSource(); + private SynchronizationContext _syncContext = null!; + private bool _disposed; + + public StaComThread() + { + _thread = new Thread(ThreadEntry) + { + Name = "MxAccess-STA", + IsBackground = true + }; + _thread.SetApartmentState(ApartmentState.STA); + } + + /// + /// Starts the STA thread and waits until the message pump is running. + /// + public void Start() + { + _thread.Start(); + _ready.Task.GetAwaiter().GetResult(); + Log.Information("STA COM thread started (ThreadId={ThreadId})", _thread.ManagedThreadId); + } + + /// + /// Marshals a synchronous action onto the STA thread and returns a Task + /// that completes when the action finishes. + /// + public Task RunAsync(Action action) + { + if (_disposed) throw new ObjectDisposedException(nameof(StaComThread)); + + var tcs = new TaskCompletionSource(); + _syncContext.Post(_ => + { + try + { + action(); + tcs.TrySetResult(true); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + }, null); + return tcs.Task; + } + + /// + /// Marshals a synchronous function onto the STA thread and returns + /// a Task<T> with the result. + /// + public Task RunAsync(Func func) + { + if (_disposed) throw new ObjectDisposedException(nameof(StaComThread)); + + var tcs = new TaskCompletionSource(); + _syncContext.Post(_ => + { + try + { + tcs.TrySetResult(func()); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + }, null); + return tcs.Task; + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + try + { + // Post Application.ExitThread to break out of the message loop + _syncContext?.Post(_ => Application.ExitThread(), null); + _thread.Join(TimeSpan.FromSeconds(5)); + } + catch (Exception ex) + { + Log.Warning(ex, "Error shutting down STA COM thread"); + } + + Log.Information("STA COM thread stopped"); + } + + private void ThreadEntry() + { + try + { + // Install a WindowsFormsSynchronizationContext so that + // Post/Send dispatches onto this thread's message loop + Application.OleRequired(); + var ctx = new WindowsFormsSynchronizationContext(); + SynchronizationContext.SetSynchronizationContext(ctx); + _syncContext = ctx; + + _ready.TrySetResult(true); + + // Run the message loop — this blocks until Application.ExitThread() + Application.Run(); + } + catch (Exception ex) + { + Log.Error(ex, "STA COM thread crashed"); + _ready.TrySetException(ex); + } + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj index bbf30a3..9513396 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj @@ -42,6 +42,7 @@ + ..\..\lib\ArchestrA.MXAccess.dll true