Fix 5 code review findings (P1-P3)
P1: Wire OPC UA monitored items to MXAccess subscriptions
- Override OnCreateMonitoredItemsComplete/OnDeleteMonitoredItemsComplete
in LmxNodeManager to trigger ref-counted SubscribeTag/UnsubscribeTag
- Clients subscribing to tags now start live MXAccess data pushes
P1: Write timeout now returns false instead of true
- Previously a missing OnWriteComplete callback was treated as success
- Now correctly reports failure so OPC UA clients see the error
P1: Auto-reconnect retries from Error state (not just Disconnected)
- Monitor loop now checks both Disconnected and Error states
- Prevents permanent outages after a single failed reconnect attempt
P2: Topological sort on hierarchy before building address space
- Parents guaranteed to appear before children regardless of input order
- Prevents misplaced nodes when SQL returns unsorted results
P3: Skip redundant first-poll rebuild on startup
- ChangeDetectionService accepts initial deploy time from OpcUaService
- First poll only triggers rebuild if deploy time is actually unknown
- Eliminates duplicate DB fetch and address space rebuild at startup
All 212 tests pass (205 unit + 7 integration).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -22,10 +22,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.GalaxyRepository
|
||||
public event Action? OnGalaxyChanged;
|
||||
public DateTime? LastKnownDeployTime => _lastKnownDeployTime;
|
||||
|
||||
public ChangeDetectionService(IGalaxyRepository repository, int intervalSeconds)
|
||||
public ChangeDetectionService(IGalaxyRepository repository, int intervalSeconds, DateTime? initialDeployTime = null)
|
||||
{
|
||||
_repository = repository;
|
||||
_intervalSeconds = intervalSeconds;
|
||||
_lastKnownDeployTime = initialDeployTime;
|
||||
}
|
||||
|
||||
public void Start()
|
||||
@@ -43,8 +44,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.GalaxyRepository
|
||||
|
||||
private async Task PollLoopAsync(CancellationToken ct)
|
||||
{
|
||||
// First poll always triggers
|
||||
bool firstPoll = true;
|
||||
// If no initial deploy time was provided, first poll triggers unconditionally
|
||||
bool firstPoll = _lastKnownDeployTime == null;
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
|
||||
@@ -35,9 +35,9 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
|
||||
|
||||
try
|
||||
{
|
||||
if (_state == ConnectionState.Disconnected && _config.AutoReconnect)
|
||||
if ((_state == ConnectionState.Disconnected || _state == ConnectionState.Error) && _config.AutoReconnect)
|
||||
{
|
||||
Log.Information("Monitor: connection lost, attempting reconnect");
|
||||
Log.Information("Monitor: connection lost (state={State}), attempting reconnect", _state);
|
||||
await ReconnectAsync();
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -100,7 +100,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
|
||||
|
||||
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
cts.CancelAfter(TimeSpan.FromSeconds(_config.WriteTimeoutSeconds));
|
||||
cts.Token.Register(() => tcs.TrySetResult(true)); // timeout assumes success
|
||||
cts.Token.Register(() =>
|
||||
{
|
||||
Log.Warning("Write timed out for {Address} after {Timeout}s", fullTagReference, _config.WriteTimeoutSeconds);
|
||||
tcs.TrySetResult(false);
|
||||
});
|
||||
|
||||
return await tcs.Task;
|
||||
}
|
||||
|
||||
@@ -72,15 +72,15 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
VariableNodeCount = 0;
|
||||
ObjectNodeCount = 0;
|
||||
|
||||
// Build lookup: gobject_id → object info
|
||||
var objectMap = hierarchy.ToDictionary(h => h.GobjectId);
|
||||
// Topological sort: ensure parents appear before children regardless of input order
|
||||
var sorted = TopologicalSort(hierarchy);
|
||||
|
||||
// Build lookup: gobject_id → list of attributes
|
||||
var attrsByObject = attributes
|
||||
.GroupBy(a => a.GobjectId)
|
||||
.ToDictionary(g => g.Key, g => g.ToList());
|
||||
|
||||
// Find root objects (those whose parent is not in the hierarchy)
|
||||
// Root folder
|
||||
var rootFolder = CreateFolder(null, "ZB", "ZB");
|
||||
rootFolder.NodeId = new NodeId("ZB", NamespaceIndex);
|
||||
rootFolder.AddReference(ReferenceTypeIds.Organizes, true, ObjectIds.ObjectsFolder);
|
||||
@@ -93,9 +93,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
|
||||
// Create nodes for each object in hierarchy
|
||||
var nodeMap = new Dictionary<int, NodeState>();
|
||||
var parentIds = new HashSet<int>(hierarchy.Select(h => h.ParentGobjectId));
|
||||
|
||||
foreach (var obj in hierarchy)
|
||||
foreach (var obj in sorted)
|
||||
{
|
||||
NodeState parentNode;
|
||||
if (nodeMap.TryGetValue(obj.ParentGobjectId, out var p))
|
||||
@@ -172,6 +171,33 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sorts hierarchy so parents always appear before children, regardless of input order.
|
||||
/// </summary>
|
||||
private static List<GalaxyObjectInfo> TopologicalSort(List<GalaxyObjectInfo> hierarchy)
|
||||
{
|
||||
var byId = hierarchy.ToDictionary(h => h.GobjectId);
|
||||
var knownIds = new HashSet<int>(hierarchy.Select(h => h.GobjectId));
|
||||
var visited = new HashSet<int>();
|
||||
var result = new List<GalaxyObjectInfo>(hierarchy.Count);
|
||||
|
||||
void Visit(GalaxyObjectInfo obj)
|
||||
{
|
||||
if (!visited.Add(obj.GobjectId)) return;
|
||||
|
||||
// Visit parent first if it exists in the hierarchy
|
||||
if (knownIds.Contains(obj.ParentGobjectId) && byId.TryGetValue(obj.ParentGobjectId, out var parent))
|
||||
Visit(parent);
|
||||
|
||||
result.Add(obj);
|
||||
}
|
||||
|
||||
foreach (var obj in hierarchy)
|
||||
Visit(obj);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private void CreateAttributeVariable(NodeState parent, GalaxyAttributeInfo attr)
|
||||
{
|
||||
var opcUaDataTypeId = MxDataTypeMapper.MapToOpcUaDataType(attr.MxDataType);
|
||||
@@ -327,9 +353,41 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
#region Subscription Delivery
|
||||
|
||||
/// <summary>
|
||||
/// Subscribes to MXAccess for the given tag reference. Called by the service wiring layer.
|
||||
/// Called by the OPC UA framework after monitored items are created on nodes in our namespace.
|
||||
/// Triggers ref-counted MXAccess subscriptions for the underlying tags.
|
||||
/// </summary>
|
||||
public void SubscribeTag(string fullTagReference)
|
||||
protected override void OnCreateMonitoredItemsComplete(ServerSystemContext context, IList<IMonitoredItem> monitoredItems)
|
||||
{
|
||||
foreach (var item in monitoredItems)
|
||||
{
|
||||
var nodeIdStr = GetNodeIdString(item);
|
||||
if (nodeIdStr != null && _nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
|
||||
SubscribeTag(tagRef);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Called by the OPC UA framework after monitored items are deleted.
|
||||
/// Decrements ref-counted MXAccess subscriptions.
|
||||
/// </summary>
|
||||
protected override void OnDeleteMonitoredItemsComplete(ServerSystemContext context, IList<IMonitoredItem> monitoredItems)
|
||||
{
|
||||
foreach (var item in monitoredItems)
|
||||
{
|
||||
var nodeIdStr = GetNodeIdString(item);
|
||||
if (nodeIdStr != null && _nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
|
||||
UnsubscribeTag(tagRef);
|
||||
}
|
||||
}
|
||||
|
||||
private static string? GetNodeIdString(IMonitoredItem item)
|
||||
{
|
||||
if (item.ManagerHandle is NodeState node)
|
||||
return node.NodeId?.Identifier as string;
|
||||
return null;
|
||||
}
|
||||
|
||||
internal void SubscribeTag(string fullTagReference)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
@@ -345,6 +403,25 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
}
|
||||
}
|
||||
|
||||
internal void UnsubscribeTag(string fullTagReference)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_subscriptionRefCounts.TryGetValue(fullTagReference, out var count))
|
||||
{
|
||||
if (count <= 1)
|
||||
{
|
||||
_subscriptionRefCounts.Remove(fullTagReference);
|
||||
_ = _mxAccessClient.UnsubscribeAsync(fullTagReference);
|
||||
}
|
||||
else
|
||||
{
|
||||
_subscriptionRefCounts[fullTagReference] = count - 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void OnMxAccessDataChange(string address, Vtq vtq)
|
||||
{
|
||||
if (_tagToVariableNode.TryGetValue(address, out var variable))
|
||||
|
||||
@@ -140,10 +140,12 @@ namespace ZB.MOM.WW.LmxOpcUa.Host
|
||||
_serverHost = new OpcUaServerHost(_config.OpcUa, effectiveMxClient, _metrics);
|
||||
|
||||
// Step 9-10: Query hierarchy, start server, build address space
|
||||
DateTime? initialDeployTime = null;
|
||||
if (_galaxyRepository != null && _galaxyStats.DbConnected)
|
||||
{
|
||||
try
|
||||
{
|
||||
initialDeployTime = _galaxyRepository.GetLastDeployTimeAsync(_cts.Token).GetAwaiter().GetResult();
|
||||
var hierarchy = _galaxyRepository.GetHierarchyAsync(_cts.Token).GetAwaiter().GetResult();
|
||||
var attributes = _galaxyRepository.GetAttributesAsync(_cts.Token).GetAwaiter().GetResult();
|
||||
_galaxyStats.ObjectCount = hierarchy.Count;
|
||||
@@ -177,7 +179,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host
|
||||
// Step 11-12: Change detection wired to rebuild
|
||||
if (_galaxyRepository != null)
|
||||
{
|
||||
_changeDetection = new ChangeDetectionService(_galaxyRepository, _config.GalaxyRepository.ChangeDetectionIntervalSeconds);
|
||||
_changeDetection = new ChangeDetectionService(_galaxyRepository, _config.GalaxyRepository.ChangeDetectionIntervalSeconds, initialDeployTime);
|
||||
_changeDetection.OnGalaxyChanged += OnGalaxyChanged;
|
||||
_changeDetection.Start();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user