Fix second-pass review findings: subscription leak on rebuild, metrics accuracy, and MxAccess startup recovery

- Preserve and replay subscription ref counts across address space rebuilds to prevent MXAccess subscription leaks
- Mark read timeouts and write failures as unsuccessful in PerformanceMetrics for accurate health reporting
- Add deferred MxAccess reconnect path when initial connection fails at startup
- Update code review document with verified completions and new findings
- Add covering tests for all fixes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-03-25 09:41:12 -04:00
parent 71254e005e
commit 09ed15bdda
12 changed files with 307 additions and 51 deletions

View File

@@ -18,8 +18,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
{
_connectionHandle = await _staThread.RunAsync(() =>
{
_proxy.OnDataChange += HandleOnDataChange;
_proxy.OnWriteComplete += HandleOnWriteComplete;
AttachProxyEvents();
return _proxy.Register(_config.ClientName);
});
@@ -34,12 +33,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
{
_probeTag = _config.ProbeTag;
_lastProbeValueTime = DateTime.UtcNow;
await SubscribeInternalAsync(_probeTag);
await SubscribeInternalAsync(_probeTag!);
Log.Information("Probe tag subscribed: {ProbeTag}", _probeTag);
}
}
catch (Exception ex)
{
try
{
await _staThread.RunAsync(DetachProxyEvents);
}
catch (Exception cleanupEx)
{
Log.Warning(cleanupEx, "Failed to detach proxy events after connection failure");
}
Log.Error(ex, "MxAccess connection failed");
SetState(ConnectionState.Error, ex.Message);
throw;
@@ -70,8 +78,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
}
// Unwire events before unregister
_proxy.OnDataChange -= HandleOnDataChange;
_proxy.OnWriteComplete -= HandleOnWriteComplete;
DetachProxyEvents();
// Unregister
try { _proxy.Unregister(_connectionHandle); }
@@ -80,6 +87,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
_handleToAddress.Clear();
_addressToHandle.Clear();
_pendingReadsByAddress.Clear();
_pendingWrites.Clear();
}
catch (Exception ex)
@@ -109,5 +117,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
SetState(ConnectionState.Error, ex.Message);
}
}
private void AttachProxyEvents()
{
if (_proxyEventsAttached) return;
_proxy.OnDataChange += HandleOnDataChange;
_proxy.OnWriteComplete += HandleOnWriteComplete;
_proxyEventsAttached = true;
}
private void DetachProxyEvents()
{
if (!_proxyEventsAttached) return;
_proxy.OnDataChange -= HandleOnDataChange;
_proxy.OnWriteComplete -= HandleOnWriteComplete;
_proxyEventsAttached = false;
}
}
}

View File

@@ -50,6 +50,12 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
callback(address, vtq);
}
if (_pendingReadsByAddress.TryGetValue(address, out var pendingReads))
{
foreach (var pendingRead in pendingReads.Values)
pendingRead.TrySetResult(vtq);
}
// Global handler
OnTagValueChanged?.Invoke(address, vtq);
}

View File

@@ -19,9 +19,6 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
using var scope = _metrics.BeginOperation("Read");
var tcs = new TaskCompletionSource<Vtq>();
// Subscribe, get first value, unsubscribe
void OnValue(string addr, Vtq vtq) => tcs.TrySetResult(vtq);
var itemHandle = await _staThread.RunAsync(() =>
{
var h = _proxy.AddItem(_connectionHandle, fullTagReference);
@@ -29,9 +26,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
return h;
});
var pendingReads = _pendingReadsByAddress.GetOrAdd(fullTagReference,
_ => new System.Collections.Concurrent.ConcurrentDictionary<int, TaskCompletionSource<Vtq>>());
pendingReads[itemHandle] = tcs;
_handleToAddress[itemHandle] = fullTagReference;
_addressToHandle[fullTagReference] = itemHandle;
_storedSubscriptions[fullTagReference] = OnValue;
try
{
@@ -39,7 +37,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
cts.CancelAfter(TimeSpan.FromSeconds(_config.ReadTimeoutSeconds));
cts.Token.Register(() => tcs.TrySetResult(Vtq.Bad(Quality.BadCommFailure)));
return await tcs.Task;
var result = await tcs.Task;
if (result.Quality != Quality.Good)
scope.SetSuccess(false);
return result;
}
catch
{
@@ -48,9 +50,14 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
}
finally
{
_storedSubscriptions.TryRemove(fullTagReference, out _);
if (_pendingReadsByAddress.TryGetValue(fullTagReference, out var reads))
{
reads.TryRemove(itemHandle, out _);
if (reads.IsEmpty)
_pendingReadsByAddress.TryRemove(fullTagReference, out _);
}
_handleToAddress.TryRemove(itemHandle, out _);
_addressToHandle.TryRemove(fullTagReference, out _);
try
{
@@ -89,7 +96,6 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
});
_handleToAddress[itemHandle] = fullTagReference;
_addressToHandle[fullTagReference] = itemHandle;
var tcs = new TaskCompletionSource<bool>();
_pendingWrites[itemHandle] = tcs;
@@ -106,7 +112,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
tcs.TrySetResult(false);
});
return await tcs.Task;
var success = await tcs.Task;
if (!success)
scope.SetSuccess(false);
return success;
}
catch (Exception ex)
{
@@ -118,7 +128,6 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
{
_pendingWrites.TryRemove(itemHandle, out _);
_handleToAddress.TryRemove(itemHandle, out _);
_addressToHandle.TryRemove(fullTagReference, out _);
try
{

View File

@@ -27,6 +27,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
private int _connectionHandle;
private volatile ConnectionState _state = ConnectionState.Disconnected;
private bool _proxyEventsAttached;
private CancellationTokenSource? _monitorCts;
// Handle mappings
@@ -40,6 +41,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
// Pending writes
private readonly ConcurrentDictionary<int, TaskCompletionSource<bool>> _pendingWrites
= new ConcurrentDictionary<int, TaskCompletionSource<bool>>();
private readonly ConcurrentDictionary<string, ConcurrentDictionary<int, TaskCompletionSource<Vtq>>> _pendingReadsByAddress
= new ConcurrentDictionary<string, ConcurrentDictionary<int, TaskCompletionSource<Vtq>>>(StringComparer.OrdinalIgnoreCase);
// Probe
private string? _probeTag;

View File

@@ -145,15 +145,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
lock (Lock)
{
Log.Information("Rebuilding address space...");
var activeSubscriptions = new Dictionary<string, int>(_subscriptionRefCounts, StringComparer.OrdinalIgnoreCase);
// Remove all predefined nodes
var nodesToRemove = new List<NodeId>();
foreach (var kvp in _nodeIdToTagReference)
foreach (var tagRef in activeSubscriptions.Keys)
{
var nodeId = new NodeId(kvp.Key, NamespaceIndex);
nodesToRemove.Add(nodeId);
try
{
_mxAccessClient.UnsubscribeAsync(tagRef).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to unsubscribe {TagRef} during rebuild", tagRef);
}
}
// Remove all predefined nodes
foreach (var nodeId in PredefinedNodes.Keys.ToList())
{
try { DeleteNode(SystemContext, nodeId); }
@@ -167,6 +173,23 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
// Rebuild
BuildAddressSpace(hierarchy, attributes);
foreach (var kvp in activeSubscriptions)
{
if (!_tagToVariableNode.ContainsKey(kvp.Key))
continue;
try
{
_mxAccessClient.SubscribeAsync(kvp.Key, (_, _) => { }).GetAwaiter().GetResult();
_subscriptionRefCounts[kvp.Key] = kvp.Value;
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to restore subscription for {TagRef} after rebuild", kvp.Key);
}
}
Log.Information("Address space rebuild complete");
}
}

View File

@@ -109,14 +109,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host
_staThread = new StaComThread();
_staThread.Start();
_mxAccessClient = new MxAccessClient(_staThread, _mxProxy, _config.MxAccess, _metrics);
_mxAccessClient.ConnectAsync(_cts.Token).GetAwaiter().GetResult();
try
{
_mxAccessClient.ConnectAsync(_cts.Token).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "MxAccess connection failed at startup - monitor will continue retrying in the background");
}
// Step 6: Start monitor loop
// Step 6: Start monitor loop even if initial connect failed
_mxAccessClient.StartMonitor();
}
catch (Exception ex)
{
Log.Warning(ex, "MxAccess connection failed continuing without runtime data access");
Log.Warning(ex, "MxAccess initialization failed - continuing without runtime data access");
_mxAccessClient?.Dispose();
_mxAccessClient = null;
_staThread?.Dispose();