diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaComThread.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaComThread.cs index 9a35afb..b1e31c5 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaComThread.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaComThread.cs @@ -1,24 +1,29 @@ using System; +using System.Collections.Concurrent; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; -using System.Windows.Forms; using Serilog; namespace ZB.MOM.WW.LmxProxy.Host.MxAccess { /// - /// Dedicated STA thread with a Windows message pump for COM interop. + /// Dedicated STA thread with a raw Win32 message pump for COM interop. /// All MxAccess COM objects must be created and called on this thread /// so that COM callbacks (OnDataChange, OnWriteComplete) are delivered /// via the message loop. /// public sealed class StaComThread : IDisposable { + private const uint WM_APP = 0x8000; + private const uint PM_NOREMOVE = 0x0000; + private static readonly ILogger Log = Serilog.Log.ForContext(); private readonly Thread _thread; private readonly TaskCompletionSource _ready = new TaskCompletionSource(); - private SynchronizationContext _syncContext = null!; + private readonly ConcurrentQueue _workItems = new ConcurrentQueue(); + private volatile uint _nativeThreadId; private bool _disposed; public StaComThread() @@ -50,7 +55,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess if (_disposed) throw new ObjectDisposedException(nameof(StaComThread)); var tcs = new TaskCompletionSource(); - _syncContext.Post(_ => + _workItems.Enqueue(() => { try { @@ -61,7 +66,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess { tcs.TrySetException(ex); } - }, null); + }); + PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero); return tcs.Task; } @@ -74,7 +80,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess if (_disposed) throw new ObjectDisposedException(nameof(StaComThread)); var tcs = new TaskCompletionSource(); - _syncContext.Post(_ => + _workItems.Enqueue(() => { try { @@ -84,7 +90,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess { tcs.TrySetException(ex); } - }, null); + }); + PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero); return tcs.Task; } @@ -95,8 +102,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess try { - // Post Application.ExitThread to break out of the message loop - _syncContext?.Post(_ => Application.ExitThread(), null); + if (_nativeThreadId != 0) + PostThreadMessage(_nativeThreadId, WM_APP + 1, IntPtr.Zero, IntPtr.Zero); _thread.Join(TimeSpan.FromSeconds(5)); } catch (Exception ex) @@ -111,17 +118,33 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess { try { - // Install a WindowsFormsSynchronizationContext so that - // Post/Send dispatches onto this thread's message loop - Application.OleRequired(); - var ctx = new WindowsFormsSynchronizationContext(); - SynchronizationContext.SetSynchronizationContext(ctx); - _syncContext = ctx; + _nativeThreadId = GetCurrentThreadId(); + + // Force message queue creation by peeking + MSG msg; + PeekMessage(out msg, IntPtr.Zero, 0, 0, PM_NOREMOVE); _ready.TrySetResult(true); - // Run the message loop — this blocks until Application.ExitThread() - Application.Run(); + // Run the message loop — blocks until WM_QUIT + while (GetMessage(out msg, IntPtr.Zero, 0, 0) > 0) + { + if (msg.message == WM_APP) + { + DrainQueue(); + } + else if (msg.message == WM_APP + 1) + { + // Shutdown signal — drain remaining work then quit + DrainQueue(); + PostQuitMessage(0); + } + else + { + TranslateMessage(ref msg); + DispatchMessage(ref msg); + } + } } catch (Exception ex) { @@ -129,5 +152,66 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess _ready.TrySetException(ex); } } + + private void DrainQueue() + { + while (_workItems.TryDequeue(out var workItem)) + { + try + { + workItem(); + } + catch (Exception ex) + { + Log.Error(ex, "Unhandled exception in STA work item"); + } + } + } + + #region Win32 PInvoke + + [StructLayout(LayoutKind.Sequential)] + private struct MSG + { + public IntPtr hwnd; + public uint message; + public IntPtr wParam; + public IntPtr lParam; + public uint time; + public POINT pt; + } + + [StructLayout(LayoutKind.Sequential)] + private struct POINT + { + public int x; + public int y; + } + + [DllImport("user32.dll")] + private static extern int GetMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax); + + [DllImport("user32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + private static extern bool TranslateMessage(ref MSG lpMsg); + + [DllImport("user32.dll")] + private static extern IntPtr DispatchMessage(ref MSG lpMsg); + + [DllImport("user32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + private static extern bool PostThreadMessage(uint idThread, uint Msg, IntPtr wParam, IntPtr lParam); + + [DllImport("user32.dll")] + private static extern void PostQuitMessage(int nExitCode); + + [DllImport("user32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + private static extern bool PeekMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax, uint wRemoveMsg); + + [DllImport("kernel32.dll")] + private static extern uint GetCurrentThreadId(); + + #endregion } } diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj index 9513396..bbf30a3 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj @@ -42,7 +42,6 @@ - ..\..\lib\ArchestrA.MXAccess.dll true