using System; using System.Collections.Concurrent; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Serilog; namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess { /// /// Dedicated STA thread with a raw Win32 message pump for COM interop. /// All MxAccess COM objects must be created and called on this thread. (MXA-001) /// public sealed class StaComThread : IDisposable { private const uint WM_APP = 0x8000; private const uint PM_NOREMOVE = 0x0000; private static readonly ILogger Log = Serilog.Log.ForContext(); private static readonly TimeSpan PumpLogInterval = TimeSpan.FromMinutes(5); private readonly TaskCompletionSource _ready = new(); private readonly Thread _thread; private readonly ConcurrentQueue _workItems = new(); private long _appMessages; private long _dispatchedMessages; private bool _disposed; private DateTime _lastLogTime; private volatile uint _nativeThreadId; private volatile bool _pumpExited; private long _totalMessages; private long _workItemsExecuted; /// /// Initializes a dedicated STA thread wrapper for Wonderware COM interop. /// public StaComThread() { _thread = new Thread(ThreadEntry) { Name = "MxAccess-STA", IsBackground = true }; _thread.SetApartmentState(ApartmentState.STA); } /// /// Gets a value indicating whether the STA thread is running and able to accept work. /// public bool IsRunning => _nativeThreadId != 0 && !_disposed && !_pumpExited; /// /// Stops the STA thread and releases the message-pump resources used for COM interop. /// public void Dispose() { if (_disposed) return; _disposed = true; try { if (_nativeThreadId != 0 && !_pumpExited) PostThreadMessage(_nativeThreadId, WM_APP + 1, IntPtr.Zero, IntPtr.Zero); _thread.Join(TimeSpan.FromSeconds(5)); } catch (Exception ex) { Log.Warning(ex, "Error shutting down STA COM thread"); } DrainAndFaultQueue(); Log.Information("STA COM thread stopped"); } /// /// Starts the STA thread and waits until its message pump is ready for COM work. /// public void Start() { _thread.Start(); _ready.Task.GetAwaiter().GetResult(); Log.Information("STA COM thread started (ThreadId={ThreadId})", _thread.ManagedThreadId); } /// /// Queues an action to execute on the STA thread. /// /// The work item to execute on the STA thread. /// A task that completes when the action has finished executing. public Task RunAsync(Action action) { if (_disposed) throw new ObjectDisposedException(nameof(StaComThread)); if (_pumpExited) throw new InvalidOperationException("STA COM thread pump has exited"); var tcs = new TaskCompletionSource(); _workItems.Enqueue(new WorkItem { Execute = () => { try { action(); tcs.TrySetResult(true); } catch (Exception ex) { tcs.TrySetException(ex); } }, Fault = ex => tcs.TrySetException(ex) }); if (!PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero)) { _pumpExited = true; DrainAndFaultQueue(); } return tcs.Task; } /// /// Queues a function to execute on the STA thread and returns its result. /// /// The result type produced by the function. /// The work item to execute on the STA thread. /// A task that completes with the function result. public Task RunAsync(Func func) { if (_disposed) throw new ObjectDisposedException(nameof(StaComThread)); if (_pumpExited) throw new InvalidOperationException("STA COM thread pump has exited"); var tcs = new TaskCompletionSource(); _workItems.Enqueue(new WorkItem { Execute = () => { try { tcs.TrySetResult(func()); } catch (Exception ex) { tcs.TrySetException(ex); } }, Fault = ex => tcs.TrySetException(ex) }); if (!PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero)) { _pumpExited = true; DrainAndFaultQueue(); } return tcs.Task; } private void ThreadEntry() { try { _nativeThreadId = GetCurrentThreadId(); MSG msg; PeekMessage(out msg, IntPtr.Zero, 0, 0, PM_NOREMOVE); _ready.TrySetResult(true); _lastLogTime = DateTime.UtcNow; Log.Debug("STA message pump entering loop"); while (GetMessage(out msg, IntPtr.Zero, 0, 0) > 0) { _totalMessages++; if (msg.message == WM_APP) { _appMessages++; DrainQueue(); } else if (msg.message == WM_APP + 1) { DrainQueue(); PostQuitMessage(0); } else { _dispatchedMessages++; TranslateMessage(ref msg); DispatchMessage(ref msg); } LogPumpStatsIfDue(); } Log.Information( "STA message pump exited (Total={Total}, App={App}, Dispatched={Dispatched}, WorkItems={WorkItems})", _totalMessages, _appMessages, _dispatchedMessages, _workItemsExecuted); } catch (Exception ex) { Log.Error(ex, "STA COM thread crashed"); _ready.TrySetException(ex); } finally { _pumpExited = true; DrainAndFaultQueue(); } } private void DrainQueue() { while (_workItems.TryDequeue(out var workItem)) { _workItemsExecuted++; try { workItem.Execute(); } catch (Exception ex) { Log.Error(ex, "Unhandled exception in STA work item"); } } } private void DrainAndFaultQueue() { var faultException = new InvalidOperationException("STA COM thread pump has exited"); while (_workItems.TryDequeue(out var workItem)) { try { workItem.Fault(faultException); } catch { // Faulting a TCS should not throw, but guard against it } } } private void LogPumpStatsIfDue() { var now = DateTime.UtcNow; if (now - _lastLogTime < PumpLogInterval) return; Log.Debug( "STA pump alive: Total={Total}, App={App}, Dispatched={Dispatched}, WorkItems={WorkItems}, Pending={Pending}", _totalMessages, _appMessages, _dispatchedMessages, _workItemsExecuted, _workItems.Count); _lastLogTime = now; } private sealed class WorkItem { public Action Execute { get; set; } public Action Fault { get; set; } } #region Win32 PInvoke [StructLayout(LayoutKind.Sequential)] private struct MSG { public IntPtr hwnd; public uint message; public IntPtr wParam; public IntPtr lParam; public uint time; public POINT pt; } [StructLayout(LayoutKind.Sequential)] private struct POINT { public int x; public int y; } [DllImport("user32.dll")] private static extern int GetMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax); [DllImport("user32.dll")] [return: MarshalAs(UnmanagedType.Bool)] private static extern bool TranslateMessage(ref MSG lpMsg); [DllImport("user32.dll")] private static extern IntPtr DispatchMessage(ref MSG lpMsg); [DllImport("user32.dll")] [return: MarshalAs(UnmanagedType.Bool)] private static extern bool PostThreadMessage(uint idThread, uint Msg, IntPtr wParam, IntPtr lParam); [DllImport("user32.dll")] private static extern void PostQuitMessage(int nExitCode); [DllImport("user32.dll")] [return: MarshalAs(UnmanagedType.Bool)] private static extern bool PeekMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax, uint wRemoveMsg); [DllImport("kernel32.dll")] private static extern uint GetCurrentThreadId(); #endregion } }