using System; using System.Collections.Concurrent; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; /// /// Dedicated STA thread with a Win32 message pump that owns all LMXProxyServer COM /// instances. Lifted from v1 StaComThread per CLAUDE.md "Reference Implementation". /// Per driver-stability.md Galaxy deep dive ยง"STA thread + Win32 message pump": /// work items dispatched via PostThreadMessage(WM_APP); WM_APP+1 requests a /// graceful drain โ†’ WM_QUIT; supervisor escalates to Environment.Exit(2) if the /// pump doesn't drain within the recycle grace window. /// public sealed class StaPump : IDisposable { private const uint WM_APP = 0x8000; private const uint WM_DRAIN_AND_QUIT = WM_APP + 1; private const uint PM_NOREMOVE = 0x0000; private readonly Thread _thread; private readonly ConcurrentQueue _workItems = new(); private readonly TaskCompletionSource _started = new(TaskCreationOptions.RunContinuationsAsynchronously); private volatile uint _nativeThreadId; private volatile bool _pumpExited; private volatile bool _disposed; public int ThreadId => _thread.ManagedThreadId; public DateTime LastDispatchedUtc { get; private set; } = DateTime.MinValue; public int QueueDepth => _workItems.Count; public bool IsRunning => _nativeThreadId != 0 && !_disposed && !_pumpExited; public StaPump(string name = "Galaxy.Sta") { _thread = new Thread(PumpLoop) { Name = name, IsBackground = true }; _thread.SetApartmentState(ApartmentState.STA); _thread.Start(); } public Task WaitForStartedAsync() => _started.Task; /// Posts a work item; resolves once it's executed on the STA thread. public Task InvokeAsync(Func work) { if (_disposed) throw new ObjectDisposedException(nameof(StaPump)); if (_pumpExited) throw new InvalidOperationException("STA pump has exited"); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _workItems.Enqueue(new WorkItem( () => { try { tcs.TrySetResult(work()); } catch (Exception ex) { tcs.TrySetException(ex); } }, ex => tcs.TrySetException(ex))); if (!PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero)) { _pumpExited = true; DrainAndFaultQueue(); } return tcs.Task; } public Task InvokeAsync(Action work) => InvokeAsync(() => { work(); return 0; }); /// /// Health probe โ€” returns true if a no-op work item round-trips within /// . Used by the supervisor; timeout means the pump is wedged /// and a recycle is warranted (Task B.2 acceptance). /// public async Task IsResponsiveAsync(TimeSpan timeout) { if (!IsRunning) return false; var task = InvokeAsync(() => { }); var completed = await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false); return completed == task; } private void PumpLoop() { try { _nativeThreadId = GetCurrentThreadId(); // Force the system to create the thread message queue before we signal Started. // PeekMessage(PM_NOREMOVE) on an empty queue is the documented way to do this. PeekMessage(out _, IntPtr.Zero, 0, 0, PM_NOREMOVE); _started.TrySetResult(true); // GetMessage returns 0 on WM_QUIT, -1 on error, otherwise a positive value. while (GetMessage(out var msg, IntPtr.Zero, 0, 0) > 0) { if (msg.message == WM_APP) { DrainQueue(); } else if (msg.message == WM_DRAIN_AND_QUIT) { DrainQueue(); PostQuitMessage(0); } else { // Pass through any window/dialog messages the COM proxy may inject. TranslateMessage(ref msg); DispatchMessage(ref msg); } } } catch (Exception ex) { _started.TrySetException(ex); } finally { _pumpExited = true; DrainAndFaultQueue(); } } private void DrainQueue() { while (_workItems.TryDequeue(out var item)) { item.Execute(); LastDispatchedUtc = DateTime.UtcNow; } } private void DrainAndFaultQueue() { var ex = new InvalidOperationException("STA pump has exited"); while (_workItems.TryDequeue(out var item)) { try { item.Fault(ex); } catch { /* faulting a TCS shouldn't throw, but be defensive */ } } } public void Dispose() { if (_disposed) return; _disposed = true; try { if (_nativeThreadId != 0 && !_pumpExited) PostThreadMessage(_nativeThreadId, WM_DRAIN_AND_QUIT, IntPtr.Zero, IntPtr.Zero); _thread.Join(TimeSpan.FromSeconds(5)); } catch { /* swallow โ€” best effort */ } DrainAndFaultQueue(); } private sealed record WorkItem(Action Execute, Action Fault); #region Win32 P/Invoke [StructLayout(LayoutKind.Sequential)] private struct MSG { public IntPtr hwnd; public uint message; public IntPtr wParam; public IntPtr lParam; public uint time; public POINT pt; } [StructLayout(LayoutKind.Sequential)] private struct POINT { public int x; public int y; } [DllImport("user32.dll")] private static extern int GetMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax); [DllImport("user32.dll")] [return: MarshalAs(UnmanagedType.Bool)] private static extern bool TranslateMessage(ref MSG lpMsg); [DllImport("user32.dll")] private static extern IntPtr DispatchMessage(ref MSG lpMsg); [DllImport("user32.dll")] [return: MarshalAs(UnmanagedType.Bool)] private static extern bool PostThreadMessage(uint idThread, uint Msg, IntPtr wParam, IntPtr lParam); [DllImport("user32.dll")] private static extern void PostQuitMessage(int nExitCode); [DllImport("user32.dll")] [return: MarshalAs(UnmanagedType.Bool)] private static extern bool PeekMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax, uint wRemoveMsg); [DllImport("kernel32.dll")] private static extern uint GetCurrentThreadId(); #endregion }