feat(otopcua): OpcUaPublishActor handles discovered-node materialisation

This commit is contained in:
Joseph Doherty
2026-06-26 07:24:22 -04:00
parent 598cdfad5a
commit ccf93fc029
2 changed files with 67 additions and 6 deletions
@@ -56,6 +56,13 @@ public sealed class OpcUaPublishActor : ReceiveActor, IWithTimers
/// fall back to the latest sealed deployment (lags a not-yet-sealed apply by one revision).
/// </summary>
public sealed record RebuildAddressSpace(CorrelationId Correlation, DeploymentId? DeploymentId = null);
/// <summary>Inject driver-discovered nodes (FixedTree) under an equipment at runtime (post-connect).</summary>
public sealed record MaterialiseDiscoveredNodes(
string EquipmentRootNodeId,
IReadOnlyList<DiscoveredFolder> Folders,
IReadOnlyList<DiscoveredVariable> Variables);
public sealed record ServiceLevelChanged(byte ServiceLevel);
private readonly IOpcUaAddressSpaceSink _sink;
@@ -217,6 +224,7 @@ public sealed class OpcUaPublishActor : ReceiveActor, IWithTimers
Receive<AttributeValueUpdate>(HandleAttributeUpdate);
Receive<AlarmStateUpdate>(HandleAlarmUpdate);
Receive<RebuildAddressSpace>(HandleRebuild);
Receive<MaterialiseDiscoveredNodes>(HandleMaterialiseDiscovered);
Receive<ServiceLevelChanged>(HandleServiceLevelChanged);
Receive<RedundancyStateChanged>(HandleRedundancyStateChanged);
Receive<DbHealthProbeActor.DbHealthStatus>(HandleDbHealthStatus);
@@ -390,6 +398,12 @@ public sealed class OpcUaPublishActor : ReceiveActor, IWithTimers
}
}
/// <summary>Forwards driver-discovered (FixedTree) nodes to the applier so they are injected under
/// the equipment at runtime. No-op when no applier is wired (dev/Mac/legacy seam), matching the
/// optional-applier tolerance of <see cref="HandleRebuild"/>.</summary>
private void HandleMaterialiseDiscovered(MaterialiseDiscoveredNodes msg)
=> _applier?.MaterialiseDiscoveredNodes(msg.EquipmentRootNodeId, msg.Folders, msg.Variables);
private void HandleServiceLevelChanged(ServiceLevelChanged msg)
{
// Always publish the FIRST computed level, even if it equals the byte-default 0. Otherwise a
@@ -1,10 +1,12 @@
using System.Collections.Concurrent;
using Akka.Actor;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
@@ -98,6 +100,35 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase
AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromMilliseconds(500));
}
/// <summary>Verifies that <see cref="OpcUaPublishActor.MaterialiseDiscoveredNodes"/> forwards to the
/// applier, which drives the sink to ensure the discovered folder + (read-only) variable and announce a
/// NodeAdded model-change under the equipment root — proving the message → handler → applier → sink path
/// end to end (mirrors the real-applier-over-recording-sink harness in
/// <c>OpcUaPublishActorRebuildTests</c>).</summary>
[Fact]
public void MaterialiseDiscoveredNodes_routes_through_applier_to_sink()
{
var sink = new RecordingSink();
var applier = new AddressSpaceApplier(sink, NullLogger<AddressSpaceApplier>.Instance);
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink, applier: applier));
var folders = new[] { new DiscoveredFolder("EQ-1/Axes", "EQ-1", "Axes") };
var variables = new[]
{
new DiscoveredVariable("EQ-1/Axes/X", "EQ-1/Axes", "X", "Double",
Writable: false, IsArray: false, ArrayLength: null),
};
actor.Tell(new OpcUaPublishActor.MaterialiseDiscoveredNodes("EQ-1", folders, variables));
AwaitAssert(() =>
{
sink.Folders.ShouldContain(("EQ-1/Axes", "EQ-1", "Axes"));
sink.Variables.ShouldContain(("EQ-1/Axes/X", "EQ-1/Axes", "X", "Double", false));
sink.ModelChanges.ShouldContain("EQ-1");
}, duration: TimeSpan.FromMilliseconds(500));
}
/// <summary>Verifies that ServiceLevelChanged publishes to IServiceLevelPublisher once per unique level.</summary>
[Fact]
public void ServiceLevelChanged_publishes_to_IServiceLevelPublisher_once_per_unique_level()
@@ -548,6 +579,12 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase
public ConcurrentQueue<(string AlarmNodeId, AlarmConditionSnapshot State, DateTime Ts)> AlarmQueue { get; } = new();
/// <summary>Count of rebuild calls.</summary>
public int RebuildCalls;
/// <summary>Gets the queue of recorded EnsureFolder calls.</summary>
public ConcurrentQueue<(string NodeId, string? ParentNodeId, string DisplayName)> FolderQueue { get; } = new();
/// <summary>Gets the queue of recorded EnsureVariable calls.</summary>
public ConcurrentQueue<(string NodeId, string? ParentNodeId, string DisplayName, string DataType, bool Writable)> VariableQueue { get; } = new();
/// <summary>Gets the queue of recorded RaiseNodesAddedModelChange announcements.</summary>
public ConcurrentQueue<string> ModelChangeQueue { get; } = new();
/// <summary>Gets the list of recorded value updates.</summary>
public List<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> Values =>
@@ -555,6 +592,14 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase
/// <summary>Gets the list of recorded alarm condition updates.</summary>
public List<(string AlarmNodeId, AlarmConditionSnapshot State, DateTime Ts)> Alarms =>
AlarmQueue.ToList();
/// <summary>Gets the list of recorded EnsureFolder calls.</summary>
public List<(string NodeId, string? ParentNodeId, string DisplayName)> Folders =>
FolderQueue.ToList();
/// <summary>Gets the list of recorded EnsureVariable calls.</summary>
public List<(string NodeId, string? ParentNodeId, string DisplayName, string DataType, bool Writable)> Variables =>
VariableQueue.ToList();
/// <summary>Gets the list of recorded RaiseNodesAddedModelChange announcements.</summary>
public List<string> ModelChanges => ModelChangeQueue.ToList();
/// <summary>Records a value update.</summary>
/// <param name="nodeId">The OPC UA node identifier.</param>
@@ -579,27 +624,29 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase
/// <param name="severity">The domain severity.</param>
public void MaterialiseAlarmCondition(string alarmNodeId, string equipmentNodeId, string displayName, string alarmType, int severity, bool isNative = false) { }
/// <summary>Ensures a folder exists (no-op in test).</summary>
/// <summary>Records a folder ensure call.</summary>
/// <param name="folderNodeId">The OPC UA folder node identifier.</param>
/// <param name="parentNodeId">The parent folder node identifier, or null for root.</param>
/// <param name="displayName">The display name of the folder.</param>
public void EnsureFolder(string folderNodeId, string? parentNodeId, string displayName) { }
public void EnsureFolder(string folderNodeId, string? parentNodeId, string displayName) =>
FolderQueue.Enqueue((folderNodeId, parentNodeId, displayName));
/// <summary>Ensures a variable exists (no-op in test).</summary>
/// <summary>Records a variable ensure call.</summary>
/// <param name="variableNodeId">The OPC UA variable node identifier.</param>
/// <param name="parentFolderNodeId">The parent folder node identifier, or null for root.</param>
/// <param name="displayName">The display name of the variable.</param>
/// <param name="dataType">The OPC UA built-in type name.</param>
/// <param name="writable">Whether the node is created read/write.</param>
/// <param name="historianTagname">The resolved historian tagname (null ⇒ not historized).</param>
public void EnsureVariable(string variableNodeId, string? parentFolderNodeId, string displayName, string dataType, bool writable, string? historianTagname = null, bool isArray = false, uint? arrayLength = null) { }
public void EnsureVariable(string variableNodeId, string? parentFolderNodeId, string displayName, string dataType, bool writable, string? historianTagname = null, bool isArray = false, uint? arrayLength = null) =>
VariableQueue.Enqueue((variableNodeId, parentFolderNodeId, displayName, dataType, writable));
/// <summary>Records a rebuild call.</summary>
public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls);
/// <summary>Announces a NodeAdded model-change (no-op in test).</summary>
/// <summary>Records a NodeAdded model-change announcement.</summary>
/// <param name="affectedNodeId">The node under which discovered nodes were added.</param>
public void RaiseNodesAddedModelChange(string affectedNodeId) { }
public void RaiseNodesAddedModelChange(string affectedNodeId) => ModelChangeQueue.Enqueue(affectedNodeId);
}
/// <summary>Test implementation of IServiceLevelPublisher that records publishes.</summary>