fix(lmxproxy): resolve write timeout — bypass OnWriteComplete callback for supervisory writes
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -69,6 +69,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
|
||||
/// <summary>
|
||||
/// COM event handler for MxAccess OnWriteComplete events.
|
||||
/// Signature matches the ArchestrA.MxAccess ILMXProxyServerEvents interface.
|
||||
/// Kept wired for diagnostic logging only — writes are resolved synchronously
|
||||
/// when the Write() COM call returns without throwing.
|
||||
/// </summary>
|
||||
private void OnWriteComplete(
|
||||
int hLMXServerHandle,
|
||||
@@ -77,57 +79,23 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
|
||||
{
|
||||
try
|
||||
{
|
||||
TaskCompletionSource<bool> tcs;
|
||||
lock (_lock)
|
||||
{
|
||||
if (!_pendingWrites.TryGetValue(phItemHandle, out tcs))
|
||||
{
|
||||
Log.Debug("WriteComplete for unknown handle {Handle}", phItemHandle);
|
||||
return;
|
||||
}
|
||||
|
||||
_pendingWrites.Remove(phItemHandle);
|
||||
}
|
||||
|
||||
if (ItemStatus != null && ItemStatus.Length > 0)
|
||||
{
|
||||
var status = ItemStatus[0];
|
||||
if (status.success == 0)
|
||||
{
|
||||
string errorMsg = GetWriteErrorMessage(status.detail);
|
||||
Log.Warning("Write failed for handle {Handle}: {Error} (Category={Category}, Detail={Detail})",
|
||||
Log.Warning("OnWriteComplete callback: write failed for handle {Handle}: {Error} (Category={Category}, Detail={Detail})",
|
||||
phItemHandle, errorMsg, status.category, status.detail);
|
||||
tcs.TrySetException(new InvalidOperationException(
|
||||
string.Format("Write failed: {0}", errorMsg)));
|
||||
}
|
||||
else
|
||||
{
|
||||
Log.Debug("Write completed successfully for handle {Handle}", phItemHandle);
|
||||
tcs.TrySetResult(true);
|
||||
Log.Debug("OnWriteComplete callback: write succeeded for handle {Handle}", phItemHandle);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// No status means success
|
||||
Log.Debug("Write completed for handle {Handle} with no status", phItemHandle);
|
||||
tcs.TrySetResult(true);
|
||||
}
|
||||
|
||||
// Clean up the item after write completes
|
||||
lock (_lock)
|
||||
{
|
||||
if (_lmxProxy != null && phItemHandle > 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
_lmxProxy.UnAdvise(_connectionHandle, phItemHandle);
|
||||
_lmxProxy.RemoveItem(_connectionHandle, phItemHandle);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Debug(ex, "Error cleaning up after write for handle {Handle}", phItemHandle);
|
||||
}
|
||||
}
|
||||
Log.Debug("OnWriteComplete callback: no status for handle {Handle}", phItemHandle);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
||||
@@ -61,7 +61,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
|
||||
|
||||
/// <summary>
|
||||
/// Writes a single tag value to MxAccess.
|
||||
/// Uses Task.Run for COM calls with OnWriteComplete callback for confirmation.
|
||||
/// Uses Task.Run for COM calls. Write completes synchronously (fire-and-forget).
|
||||
/// </summary>
|
||||
public async Task WriteAsync(string address, object value, CancellationToken ct = default)
|
||||
{
|
||||
@@ -173,32 +173,14 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Internal write implementation using Task.Run for COM calls
|
||||
/// and OnWriteComplete callback for confirmation.
|
||||
/// Internal write implementation using Task.Run for COM calls.
|
||||
/// MxAccess completes supervisory writes synchronously — the Write() call
|
||||
/// succeeding (not throwing) confirms the write. The OnWriteComplete callback
|
||||
/// is kept wired for diagnostic logging but is not awaited.
|
||||
/// </summary>
|
||||
private async Task WriteInternalAsync(string address, object value, CancellationToken ct)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<bool>();
|
||||
int itemHandle = await SetupWriteOperationAsync(address, value, tcs, ct);
|
||||
|
||||
try
|
||||
{
|
||||
await WaitForWriteCompletionAsync(tcs, itemHandle, address, ct);
|
||||
}
|
||||
catch
|
||||
{
|
||||
await CleanupWriteOperationAsync(itemHandle);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sets up a write operation on the thread pool and returns the item handle.
|
||||
/// </summary>
|
||||
private async Task<int> SetupWriteOperationAsync(
|
||||
string address, object value, TaskCompletionSource<bool> tcs, CancellationToken ct)
|
||||
{
|
||||
return await Task.Run(() =>
|
||||
await Task.Run(() =>
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
@@ -214,79 +196,38 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
|
||||
// Advise to enable writing
|
||||
_lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle);
|
||||
|
||||
// Track pending write for OnWriteComplete callback
|
||||
_pendingWrites[itemHandle] = tcs;
|
||||
|
||||
// Write the value (-1 = no security classification)
|
||||
// MxAccess completes simple/supervisory writes synchronously.
|
||||
// If Write() returns without throwing, the write succeeded.
|
||||
_lmxProxy.Write(_connectionHandle, itemHandle, value, -1);
|
||||
|
||||
return itemHandle;
|
||||
Log.Debug("Write completed synchronously for {Address} (handle={Handle})", address, itemHandle);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Clean up on failure
|
||||
Log.Error(ex, "Failed to write value to {Address}", address);
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Clean up: UnAdvise + RemoveItem after write (success or failure)
|
||||
if (itemHandle > 0 && _lmxProxy != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
_lmxProxy.UnAdvise(_connectionHandle, itemHandle);
|
||||
_lmxProxy.RemoveItem(_connectionHandle, itemHandle);
|
||||
_pendingWrites.Remove(itemHandle);
|
||||
}
|
||||
catch { }
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Debug(ex, "Error cleaning up write item for {Address} (handle={Handle})", address, itemHandle);
|
||||
}
|
||||
}
|
||||
Log.Error(ex, "Failed to write value to {Address}", address);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}, ct);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Waits for write completion with timeout.
|
||||
/// </summary>
|
||||
private async Task WaitForWriteCompletionAsync(
|
||||
TaskCompletionSource<bool> tcs, int itemHandle, string address, CancellationToken ct)
|
||||
{
|
||||
using (ct.Register(() => tcs.TrySetCanceled()))
|
||||
{
|
||||
var timeoutTask = Task.Delay(_writeTimeoutMs, ct);
|
||||
var completedTask = await Task.WhenAny(tcs.Task, timeoutTask);
|
||||
|
||||
if (completedTask == timeoutTask)
|
||||
{
|
||||
await CleanupWriteOperationAsync(itemHandle);
|
||||
throw new TimeoutException(
|
||||
string.Format("Write operation to {0} timed out after {1}ms", address, _writeTimeoutMs));
|
||||
}
|
||||
|
||||
await tcs.Task; // This will throw if the write failed
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Cleans up a write operation (unadvise + remove item).
|
||||
/// </summary>
|
||||
private async Task CleanupWriteOperationAsync(int itemHandle)
|
||||
{
|
||||
await Task.Run(() =>
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
_pendingWrites.Remove(itemHandle);
|
||||
if (itemHandle > 0 && _lmxProxy != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
_lmxProxy.UnAdvise(_connectionHandle, itemHandle);
|
||||
_lmxProxy.RemoveItem(_connectionHandle, itemHandle);
|
||||
}
|
||||
catch { }
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Maps an MxAccess OPC DA quality integer to the domain Quality enum.
|
||||
/// </summary>
|
||||
|
||||
Reference in New Issue
Block a user