From ccf93fc0292564700eb77c2109990b8cb3f05a2c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 26 Jun 2026 07:24:22 -0400 Subject: [PATCH] feat(otopcua): OpcUaPublishActor handles discovered-node materialisation --- .../OpcUa/OpcUaPublishActor.cs | 14 +++++ .../OpcUa/OpcUaPublishActorTests.cs | 59 +++++++++++++++++-- 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs index 40259377..f069a77f 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs @@ -56,6 +56,13 @@ public sealed class OpcUaPublishActor : ReceiveActor, IWithTimers /// fall back to the latest sealed deployment (lags a not-yet-sealed apply by one revision). /// public sealed record RebuildAddressSpace(CorrelationId Correlation, DeploymentId? DeploymentId = null); + + /// Inject driver-discovered nodes (FixedTree) under an equipment at runtime (post-connect). + public sealed record MaterialiseDiscoveredNodes( + string EquipmentRootNodeId, + IReadOnlyList Folders, + IReadOnlyList Variables); + public sealed record ServiceLevelChanged(byte ServiceLevel); private readonly IOpcUaAddressSpaceSink _sink; @@ -217,6 +224,7 @@ public sealed class OpcUaPublishActor : ReceiveActor, IWithTimers Receive(HandleAttributeUpdate); Receive(HandleAlarmUpdate); Receive(HandleRebuild); + Receive(HandleMaterialiseDiscovered); Receive(HandleServiceLevelChanged); Receive(HandleRedundancyStateChanged); Receive(HandleDbHealthStatus); @@ -390,6 +398,12 @@ public sealed class OpcUaPublishActor : ReceiveActor, IWithTimers } } + /// Forwards driver-discovered (FixedTree) nodes to the applier so they are injected under + /// the equipment at runtime. No-op when no applier is wired (dev/Mac/legacy seam), matching the + /// optional-applier tolerance of . + private void HandleMaterialiseDiscovered(MaterialiseDiscoveredNodes msg) + => _applier?.MaterialiseDiscoveredNodes(msg.EquipmentRootNodeId, msg.Folders, msg.Variables); + private void HandleServiceLevelChanged(ServiceLevelChanged msg) { // Always publish the FIRST computed level, even if it equals the byte-default 0. Otherwise a diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs index 26215dd3..e903b714 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs @@ -1,10 +1,12 @@ using System.Collections.Concurrent; using Akka.Actor; +using Microsoft.Extensions.Logging.Abstractions; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; using ZB.MOM.WW.OtOpcUa.Runtime.Health; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; @@ -98,6 +100,35 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromMilliseconds(500)); } + /// Verifies that forwards to the + /// applier, which drives the sink to ensure the discovered folder + (read-only) variable and announce a + /// NodeAdded model-change under the equipment root — proving the message → handler → applier → sink path + /// end to end (mirrors the real-applier-over-recording-sink harness in + /// OpcUaPublishActorRebuildTests). + [Fact] + public void MaterialiseDiscoveredNodes_routes_through_applier_to_sink() + { + var sink = new RecordingSink(); + var applier = new AddressSpaceApplier(sink, NullLogger.Instance); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink, applier: applier)); + + var folders = new[] { new DiscoveredFolder("EQ-1/Axes", "EQ-1", "Axes") }; + var variables = new[] + { + new DiscoveredVariable("EQ-1/Axes/X", "EQ-1/Axes", "X", "Double", + Writable: false, IsArray: false, ArrayLength: null), + }; + + actor.Tell(new OpcUaPublishActor.MaterialiseDiscoveredNodes("EQ-1", folders, variables)); + + AwaitAssert(() => + { + sink.Folders.ShouldContain(("EQ-1/Axes", "EQ-1", "Axes")); + sink.Variables.ShouldContain(("EQ-1/Axes/X", "EQ-1/Axes", "X", "Double", false)); + sink.ModelChanges.ShouldContain("EQ-1"); + }, duration: TimeSpan.FromMilliseconds(500)); + } + /// Verifies that ServiceLevelChanged publishes to IServiceLevelPublisher once per unique level. [Fact] public void ServiceLevelChanged_publishes_to_IServiceLevelPublisher_once_per_unique_level() @@ -548,6 +579,12 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase public ConcurrentQueue<(string AlarmNodeId, AlarmConditionSnapshot State, DateTime Ts)> AlarmQueue { get; } = new(); /// Count of rebuild calls. public int RebuildCalls; + /// Gets the queue of recorded EnsureFolder calls. + public ConcurrentQueue<(string NodeId, string? ParentNodeId, string DisplayName)> FolderQueue { get; } = new(); + /// Gets the queue of recorded EnsureVariable calls. + public ConcurrentQueue<(string NodeId, string? ParentNodeId, string DisplayName, string DataType, bool Writable)> VariableQueue { get; } = new(); + /// Gets the queue of recorded RaiseNodesAddedModelChange announcements. + public ConcurrentQueue ModelChangeQueue { get; } = new(); /// Gets the list of recorded value updates. public List<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> Values => @@ -555,6 +592,14 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase /// Gets the list of recorded alarm condition updates. public List<(string AlarmNodeId, AlarmConditionSnapshot State, DateTime Ts)> Alarms => AlarmQueue.ToList(); + /// Gets the list of recorded EnsureFolder calls. + public List<(string NodeId, string? ParentNodeId, string DisplayName)> Folders => + FolderQueue.ToList(); + /// Gets the list of recorded EnsureVariable calls. + public List<(string NodeId, string? ParentNodeId, string DisplayName, string DataType, bool Writable)> Variables => + VariableQueue.ToList(); + /// Gets the list of recorded RaiseNodesAddedModelChange announcements. + public List ModelChanges => ModelChangeQueue.ToList(); /// Records a value update. /// The OPC UA node identifier. @@ -579,27 +624,29 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase /// The domain severity. public void MaterialiseAlarmCondition(string alarmNodeId, string equipmentNodeId, string displayName, string alarmType, int severity, bool isNative = false) { } - /// Ensures a folder exists (no-op in test). + /// Records a folder ensure call. /// The OPC UA folder node identifier. /// The parent folder node identifier, or null for root. /// The display name of the folder. - public void EnsureFolder(string folderNodeId, string? parentNodeId, string displayName) { } + public void EnsureFolder(string folderNodeId, string? parentNodeId, string displayName) => + FolderQueue.Enqueue((folderNodeId, parentNodeId, displayName)); - /// Ensures a variable exists (no-op in test). + /// Records a variable ensure call. /// The OPC UA variable node identifier. /// The parent folder node identifier, or null for root. /// The display name of the variable. /// The OPC UA built-in type name. /// Whether the node is created read/write. /// The resolved historian tagname (null ⇒ not historized). - public void EnsureVariable(string variableNodeId, string? parentFolderNodeId, string displayName, string dataType, bool writable, string? historianTagname = null, bool isArray = false, uint? arrayLength = null) { } + public void EnsureVariable(string variableNodeId, string? parentFolderNodeId, string displayName, string dataType, bool writable, string? historianTagname = null, bool isArray = false, uint? arrayLength = null) => + VariableQueue.Enqueue((variableNodeId, parentFolderNodeId, displayName, dataType, writable)); /// Records a rebuild call. public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls); - /// Announces a NodeAdded model-change (no-op in test). + /// Records a NodeAdded model-change announcement. /// The node under which discovered nodes were added. - public void RaiseNodesAddedModelChange(string affectedNodeId) { } + public void RaiseNodesAddedModelChange(string affectedNodeId) => ModelChangeQueue.Enqueue(affectedNodeId); } /// Test implementation of IServiceLevelPublisher that records publishes.