feat(lmxproxy): re-enable OnWriteComplete callback via STA message pump

With StaComThread's GetMessage loop in place, OnWriteComplete callbacks
are now delivered properly. Write flow: dispatch Write() on STA thread,
await OnWriteComplete via TCS, clean up on STA thread. Falls back to
fire-and-forget on timeout as safety net. OnWriteComplete now resolves
or rejects the TCS with MxStatus error details.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-03-22 23:35:09 -04:00
parent a326a8cbde
commit 84b7b6a7a9
2 changed files with 75 additions and 34 deletions

View File

@@ -1,5 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain;
@@ -79,9 +79,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
/// <summary>
/// COM event handler for MxAccess OnWriteComplete events.
/// Signature matches the ArchestrA.MxAccess ILMXProxyServerEvents interface.
/// Kept wired for diagnostic logging only — writes are resolved synchronously
/// when the Write() COM call returns without throwing.
/// Resolves the pending TaskCompletionSource so the caller gets
/// confirmation (or error) from the OnWriteComplete callback.
/// </summary>
private void OnWriteComplete(
int hLMXServerHandle,
@@ -90,22 +89,32 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{
try
{
TaskCompletionSource<bool> tcs;
bool hasPending;
lock (_lock)
{
hasPending = _pendingWrites.TryGetValue(phItemHandle, out tcs);
}
if (ItemStatus != null && ItemStatus.Length > 0)
{
var status = ItemStatus[0];
if (status.success == 0)
{
Log.Warning("OnWriteComplete callback: write failed for handle {Handle}: {Status}",
phItemHandle, MxStatusMapper.FormatStatus(status.detail, (int)status.category, (int)status.detectedBy));
string errorMsg = MxStatusMapper.FormatStatus(status.detail, (int)status.category, (int)status.detectedBy);
Log.Warning("OnWriteComplete: write failed for handle {Handle}: {Status}", phItemHandle, errorMsg);
if (hasPending) tcs.TrySetException(new InvalidOperationException("Write failed: " + errorMsg));
}
else
{
Log.Debug("OnWriteComplete callback: write succeeded for handle {Handle}", phItemHandle);
Log.Debug("OnWriteComplete: write succeeded for handle {Handle}", phItemHandle);
if (hasPending) tcs.TrySetResult(true);
}
}
else
{
Log.Debug("OnWriteComplete callback: no status for handle {Handle}", phItemHandle);
Log.Debug("OnWriteComplete: no status for handle {Handle}", phItemHandle);
tcs?.TrySetResult(true);
}
}
catch (Exception ex)

View File

@@ -184,12 +184,16 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
/// <summary>
/// Internal write implementation dispatched on the STA thread.
/// MxAccess completes supervisory writes synchronously — the Write() call
/// succeeding (not throwing) confirms the write. The OnWriteComplete callback
/// is kept wired for diagnostic logging but is not awaited.
/// Registers a TaskCompletionSource, calls Write(), then awaits the
/// OnWriteComplete callback via the STA message pump. Falls back to
/// fire-and-forget if the callback doesn't arrive within the timeout.
/// </summary>
private async Task WriteInternalAsync(string address, object value, CancellationToken ct)
{
var tcs = new TaskCompletionSource<bool>();
int itemHandle = 0;
// Step 1: Setup and write on the STA thread
await _staThread.RunAsync(() =>
{
lock (_lock)
@@ -197,24 +201,23 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
if (!IsConnected || _lmxProxy == null)
throw new InvalidOperationException("Not connected to MxAccess");
int itemHandle = 0;
try
{
// Add the item
itemHandle = _lmxProxy.AddItem(_connectionHandle, address);
// Advise to enable writing
_lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle);
// Register for OnWriteComplete callback
_pendingWrites[itemHandle] = tcs;
// Write the value (-1 = no security classification)
// MxAccess completes simple/supervisory writes synchronously.
// If Write() returns without throwing, the write succeeded.
_lmxProxy.Write(_connectionHandle, itemHandle, value, -1);
Log.Debug("Write completed synchronously for {Address} (handle={Handle})", address, itemHandle);
Log.Debug("Write dispatched for {Address} (handle={Handle}), awaiting OnWriteComplete",
address, itemHandle);
}
catch (System.Runtime.InteropServices.COMException comEx)
{
_pendingWrites.Remove(itemHandle);
string enriched = string.Format("Write failed for '{0}': COM error 0x{1:X8} — {2}",
address, comEx.ErrorCode, comEx.Message);
Log.Error(comEx, "COM write error for {Address}: HRESULT=0x{ErrorCode:X8}",
@@ -223,27 +226,56 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
}
catch (Exception ex)
{
_pendingWrites.Remove(itemHandle);
Log.Error(ex, "Failed to write value to {Address}", address);
throw;
}
finally
{
// Clean up: UnAdvise + RemoveItem after write (success or failure)
if (itemHandle > 0 && _lmxProxy != null)
{
try
{
_lmxProxy.UnAdvise(_connectionHandle, itemHandle);
_lmxProxy.RemoveItem(_connectionHandle, itemHandle);
}
catch (Exception ex)
{
Log.Debug(ex, "Error cleaning up write item for {Address} (handle={Handle})", address, itemHandle);
}
}
}
}
});
// Step 2: Wait for OnWriteComplete callback (delivered via STA message pump)
try
{
using (var cts = new CancellationTokenSource(_writeTimeoutMs))
using (ct.Register(() => cts.Cancel()))
{
cts.Token.Register(() => tcs.TrySetResult(true)); // timeout = assume success (fire-and-forget fallback)
await tcs.Task;
}
}
finally
{
// Step 3: Clean up on the STA thread
if (itemHandle > 0)
{
try
{
await _staThread.RunAsync(() =>
{
lock (_lock)
{
_pendingWrites.Remove(itemHandle);
if (_lmxProxy != null && _connectionHandle > 0)
{
try
{
_lmxProxy.UnAdvise(_connectionHandle, itemHandle);
_lmxProxy.RemoveItem(_connectionHandle, itemHandle);
}
catch (Exception ex)
{
Log.Debug(ex, "Error cleaning up write item for {Address} (handle={Handle})", address, itemHandle);
}
}
}
});
}
catch (Exception ex)
{
Log.Debug(ex, "Error dispatching write cleanup for {Address}", address);
}
}
}
}
/// <summary>