From b9b8d3d389146f1a97f05eda6eb7a3c9d6b25870 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 26 Jun 2026 07:59:01 -0400 Subject: [PATCH] feat(otopcua): inject discovered nodes into the equipment projection on connect --- .../Drivers/DriverHostActor.cs | 127 ++++++++++ .../Drivers/DriverHostActorDiscoveryTests.cs | 219 ++++++++++++++++++ 2 files changed, 346 insertions(+) create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorDiscoveryTests.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index 519319b4..dc4774e2 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -153,6 +153,20 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers /// value maps so stale condition state never leaks across redeploys. private readonly NativeAlarmProjector _nativeAlarmProjector = new(); + /// The composition from the most-recent apply (set at the END of + /// ). Discovered-node injection + /// () reads it to resolve the equipment bound to a driver (via the + /// authored EquipmentTags, since EquipmentNode carries no DriverInstanceId) and to + /// recompute the authored value + alarm subscription sets when merging FixedTree refs. Null until the + /// first apply — a arriving before any apply is + /// ignored. + private AddressSpaceComposition? _lastComposition; + + /// The most-recent discovered-injection plan per driver instance, cached so Task 8 (the + /// address-space cache rebuild) can re-apply the live graft after a rebuild without re-running + /// discovery. Keyed by DriverInstanceId; last-writer-wins on a re-discovery. + private readonly Dictionary _discoveredByDriver = new(StringComparer.Ordinal); + /// /// Cached local from the latest /// snapshot (null = unknown until the first snapshot arrives, or no local node match). The inbound @@ -483,6 +497,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers Receive(HandleGetDiagnostics); Receive(ForwardToMux); Receive(ForwardNativeAlarm); + Receive(HandleDiscoveredNodes); Receive(HandleRestartDriver); Receive(HandleReconnectDriver); Receive(HandleRouteNodeWrite); @@ -511,6 +526,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers Receive(HandleGetDiagnostics); Receive(ForwardToMux); Receive(ForwardNativeAlarm); + Receive(HandleDiscoveredNodes); Receive(HandleRestartDriver); Receive(HandleReconnectDriver); Receive(HandleRouteNodeWrite); @@ -552,6 +568,111 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers } } + /// + /// Handles a driver child's post-connect : + /// resolves the equipment the driver is bound to from the most-recent applied composition, maps the + /// captured FixedTree under it via (deduping any node that shadows + /// an authored equipment-tag ref), caches the plan, and grafts it onto the served address space + + /// live-value maps + subscription set via . Idempotent / + /// duplicate-safe: the mapper is pure, materialisation is idempotent, and the routing-map extension + + /// subscription merge are set-based. + /// + private void HandleDiscoveredNodes(DriverInstanceActor.DiscoveredNodesReady msg) + { + if (_lastComposition is null) + { + _log.Debug("DriverHost {Node}: DiscoveredNodesReady from {Driver} before any composition applied — ignored", + _localNode, msg.DriverInstanceId); + return; + } + + // Resolve the equipment bound to this driver via the authored equipment tags. EquipmentNode carries + // no DriverInstanceId, so a driver's discovered FixedTree can only be grafted when the driver has + // >=1 authored equipment tag (which is how the equipment is resolved). The FOCAS z-34184 deploy has + // authored tags, so this holds today; extending the EquipmentNode projection with DriverInstanceId to + // drop this requirement is a follow-up. + var equipmentIds = _lastComposition.EquipmentTags + .Where(t => string.Equals(t.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal)) + .Select(t => t.EquipmentId) + .Distinct(StringComparer.Ordinal) + .ToList(); + if (equipmentIds.Count == 0) + { + _log.Info("DriverHost {Node}: no equipment/authored tags for driver {Driver} — skipping discovered-node injection", + _localNode, msg.DriverInstanceId); + return; + } + if (equipmentIds.Count > 1) + { + _log.Warning("DriverHost {Node}: driver {Driver} maps to {Count} equipments — discovered-node injection skipped (multi-equipment-per-driver is a follow-up)", + _localNode, msg.DriverInstanceId, equipmentIds.Count); + return; + } + var equipmentId = equipmentIds[0]; + + // Authored refs for THIS driver (both value + alarm tags) so a discovered node never shadows an + // authored one — the mapper drops any captured node whose FullReference is already authored. + var authoredRefs = _lastComposition.EquipmentTags + .Where(t => string.Equals(t.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal)) + .Select(t => t.FullName) + .ToHashSet(StringComparer.Ordinal); + + var plan = DiscoveredNodeMapper.Map(equipmentId, msg.Nodes, authoredRefs); + if (plan.Variables.Count == 0) return; // nothing new to inject (all captured nodes were authored) + + _discoveredByDriver[msg.DriverInstanceId] = plan; + ApplyDiscoveredPlan(msg.DriverInstanceId, equipmentId, plan); + } + + /// + /// Grafts a onto the served state: extends the live-value + /// routing map (mirroring ' fan-out so + /// lands FixedTree values on the right node), materialises the discovered folders + variables under + /// the equipment (idempotent), and merges the FixedTree refs into the driver's desired subscription + /// set (recomputing the authored value + alarm refs the same way + /// does, then unioning the FixedTree refs onto the value set) so the poll engine reads them and the + /// values flow. Extracted as a standalone method so Task 8 (the address-space cache rebuild) can + /// re-apply the cached plan after a rebuild without re-running discovery. + /// + private void ApplyDiscoveredPlan(string driverId, string equipmentId, DiscoveredInjectionPlan plan) + { + // Extend the live-value routing map (fan-out), mirroring PushDesiredSubscriptions' pattern. + foreach (var (driverRef, nodeId) in plan.RoutingByRef) + { + var key = (driverId, driverRef); + if (!_nodeIdByDriverRef.TryGetValue(key, out var set)) + _nodeIdByDriverRef[key] = set = new HashSet(StringComparer.Ordinal); + set.Add(nodeId); + _driverRefByNodeId[nodeId] = key; + } + + // Materialise the discovered folders + variables under the equipment (idempotent). + _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.MaterialiseDiscoveredNodes( + equipmentId, plan.Folders, plan.Variables)); + + // Merge the FixedTree refs into the driver's desired subscription set so the poll engine reads them + // and ForwardToMux routes the values. Recompute the authored value + alarm refs the same way + // PushDesiredSubscriptions does, then union the FixedTree refs onto the value set. + if (!_children.TryGetValue(driverId, out var entry)) return; + var authoredValueRefs = _lastComposition is null + ? Enumerable.Empty() + : _lastComposition.EquipmentTags + .Where(t => t.Alarm is null && string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal)) + .Select(t => t.FullName); + var alarmRefs = _lastComposition is null + ? Array.Empty() + : _lastComposition.EquipmentTags + .Where(t => t.Alarm is not null && string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal)) + .Select(t => t.FullName) + .Distinct(StringComparer.Ordinal) + .ToArray(); + var union = authoredValueRefs.Concat(plan.RoutingByRef.Keys).Distinct(StringComparer.Ordinal).ToArray(); + entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(union, SubscriptionPublishingInterval, alarmRefs)); + + _log.Info("DriverHost {Node}: injected {Count} discovered node(s) for driver {Driver} under {Equipment}", + _localNode, plan.Variables.Count, driverId, equipmentId); + } + /// /// Routes a native alarm transition (published by a driver child as /// ) to its materialised Part 9 condition @@ -1085,6 +1206,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers _log.Info("DriverHost {Node}: applied {Count} Equipment ScriptedAlarm(s) to the ScriptedAlarm host", _localNode, composition.EquipmentScriptedAlarms.Count); } + + // Cache the applied composition LAST so discovered-node injection (HandleDiscoveredNodes) can resolve + // the equipment bound to a driver + recompute the authored subscription sets when a driver later + // reports its FixedTree. Set here (not in ApplyAndAck) so both the fresh-apply and bootstrap-restore + // paths — which both route through this method — leave a current composition. + _lastComposition = composition; } private void SpawnChild(DriverInstanceSpec spec) diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorDiscoveryTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorDiscoveryTests.cs new file mode 100644 index 00000000..873c3c27 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorDiscoveryTests.cs @@ -0,0 +1,219 @@ +using System.Text.Json; +using Akka.Actor; +using Microsoft.EntityFrameworkCore; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; + +/// +/// Verifies the discovered-node injection wired into (Task 7): when a +/// driver child reports a captured FixedTree via , +/// the host resolves the bound equipment from the authored composition, maps the nodes under it via +/// , materialises them on the OPC UA publish side +/// (), extends the live-value routing map +/// (_nodeIdByDriverRef), and merges the FixedTree refs into the driver's desired subscription set +/// (). +/// +/// +/// Drives a real apply through the existing harness (same artifact shape as +/// DriverHostActorLiveValueTests / DriverHostActorWriteRoutingTests) so +/// _lastComposition is set and a real (non-stubbed) child +/// is spawned for d1. The child is backed by the shared +/// (records LastSubscribedRefs/SubscribeCount, exactly as +/// DriverInstanceActorTests asserts) so the merged subscription is observable; the OPC UA +/// publish actor is a (as in +/// DriverHostActorLiveValueTests) so the materialise + the post-injection value route are +/// observable. There is no test seam to inject a probe AS a driver child, so this is the faithful +/// end-to-end approach the harness allows. +/// +/// +[Trait("Category", "Unit")] +public sealed class DriverHostActorDiscoveryTests : RuntimeActorTestBase +{ + private static readonly NodeId TestNode = NodeId.Parse("driver-disc-test"); + private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64)); + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5); + private static readonly DateTime Ts = new(2026, 6, 26, 10, 0, 0, DateTimeKind.Utc); + + /// A driver's discovered FixedTree (refs differing from the authored tag) is grafted under the + /// bound equipment: (a) the publish side receives + /// rooted at the equipment NodeId; (b) the driver re-subscribes the UNION of the authored ref + the + /// FixedTree refs; (c) a value published for a FixedTree ref now routes to its mapped NodeId (proving the + /// live-value routing map was extended). + [Fact] + public void DiscoveredNodes_materialise_extend_routing_and_merge_subscription() + { + var db = NewInMemoryDbFactory(); + var factory = new SubscribingDriverFactory("Modbus"); + // One authored value tag: equipment EQ-1, driver d1, FullName "40001" — this both sets + // _lastComposition AND binds d1 → EQ-1 (the only way the equipment is resolved, since EquipmentNode + // carries no DriverInstanceId). + var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA, + (Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed")); + + var (actor, publish) = SpawnHostAndApply(db, deploymentId, factory); + + // A FixedTree discovered node whose FullReference DIFFERS from the authored tag's FullName, so the + // mapper keeps it (it does not shadow an authored ref). + var discovered = new[] + { + new DiscoveredNode( + FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" }, + BrowseName: "Model", + DisplayName: "Model", + FullReference: "ft-ref-1", + DataType: DriverDataType.Float64, + IsArray: false, + ArrayDim: null, + Writable: false, + IsHistorized: false), + }; + + actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", discovered)); + + // (a) The publish side materialises the discovered folders + variables UNDER the equipment root "EQ-1". + var materialise = publish.ExpectMsg(Timeout); + materialise.EquipmentRootNodeId.ShouldBe("EQ-1"); + materialise.Variables.Count.ShouldBe(1); + materialise.Folders.Count.ShouldBeGreaterThan(0); + var fixedTreeNodeId = materialise.Variables[0].NodeId; + + // (b) The driver re-subscribed the UNION of the authored value ref AND the FixedTree ref. The union + // push is the LAST SetDesiredSubscriptions, so the most recent subscribe carries both. + AwaitAssert(() => + { + var refs = factory.LastSubscribedRefs; + refs.ShouldNotBeNull(); + refs!.ShouldContain("40001"); + refs.ShouldContain("ft-ref-1"); + }, duration: Timeout); + + // (c) A value published for the FixedTree ref now routes to the mapped FixedTree NodeId — proving the + // _nodeIdByDriverRef live-value map was extended by the injection. + actor.Tell(new DriverInstanceActor.AttributeValuePublished( + "d1", "ft-ref-1", 42.0, OpcUaQuality.Good, Ts)); + + var update = publish.ExpectMsg(Timeout); + update.NodeId.ShouldBe(fixedTreeNodeId); + update.Value.ShouldBe(42.0); + update.Quality.ShouldBe(OpcUaQuality.Good); + update.TimestampUtc.ShouldBe(Ts); + } + + /// Spawns the host with the subscribing driver factory + a publish probe, dispatches the + /// deployment, and waits for the Applied ACK so the apply (and thus _lastComposition + the live + /// child + the initial SubscribeBulk pass) has completed before the test injects discovered nodes. A + /// VirtualTag-host probe is injected so the real host isn't spawned. The + /// that lands on the publish probe during apply is drained so the test's materialise / value-update + /// assertions see only post-apply traffic. + private (IActorRef Actor, Akka.TestKit.TestProbe Publish) SpawnHostAndApply( + IDbContextFactory db, DeploymentId deploymentId, IDriverFactory factory) + { + var coordinator = CreateTestProbe(); + var publish = CreateTestProbe(); + var vtHost = CreateTestProbe(); + + var actor = Sys.ActorOf(DriverHostActor.Props( + db, TestNode, coordinator.Ref, + driverFactory: factory, + localRoles: new HashSet { "driver" }, + opcUaPublishActor: publish.Ref, + virtualTagHostOverride: vtHost.Ref)); + + actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId())); + coordinator.ExpectMsg(Timeout).Outcome.ShouldBe(ApplyAckOutcome.Applied); + + publish.ExpectMsg(Timeout); + + return (actor, publish); + } + + /// + /// Seeds a Sealed deployment whose artifact carries the minimal arrays + /// DeploymentArtifact.BuildEquipmentTagPlans needs to project equipment tags, plus a + /// DriverInstances row with a non-Windows-only DriverType ("Modbus") + Enabled flag so + /// a REAL (non-stubbed) child is spawned (mirrors + /// DriverHostActorWriteRoutingTests.SeedDeploymentWithEquipmentTags). + /// + private static DeploymentId SeedDeploymentWithEquipmentTags( + IDbContextFactory db, RevisionHash rev, + params (string Equip, string Driver, string FullName, string? Folder, string Name)[] tags) + { + var driverIds = tags.Select(t => t.Driver).Distinct(StringComparer.Ordinal).ToArray(); + + var artifact = JsonSerializer.SerializeToUtf8Bytes(new + { + Namespaces = new[] + { + new { NamespaceId = "ns-eq", Kind = 0 }, // NamespaceKind.Equipment = 0 + }, + DriverInstances = driverIds.Select(d => new + { + DriverInstanceRowId = Guid.NewGuid(), + DriverInstanceId = d, + Name = d, + DriverType = "Modbus", // not Windows-only ⇒ a real child is spawned (not stubbed) + Enabled = true, + DriverConfig = "{}", + NamespaceId = "ns-eq", + }).ToArray(), + Tags = tags.Select((t, i) => new + { + TagId = $"tag-{i}", + EquipmentId = t.Equip, + DriverInstanceId = t.Driver, + Name = t.Name, + FolderPath = t.Folder, + DataType = "Double", + TagConfig = JsonSerializer.Serialize(new { FullName = t.FullName }), + }).ToArray(), + }); + + var id = DeploymentId.NewId(); + using var ctx = db.CreateDbContext(); + ctx.Deployments.Add(new Deployment + { + DeploymentId = id.Value, + RevisionHash = rev.Value, + Status = DeploymentStatus.Sealed, + CreatedBy = "test", + SealedAtUtc = DateTime.UtcNow, + ArtifactBlob = artifact, + }); + ctx.SaveChanges(); + return id; + } + + /// Factory producing a single shared for the supported + /// type, exposing its most-recent subscribed reference set for assertions (mirrors + /// DriverHostActorWriteRoutingTests.RecordingDriverFactory, but the driver is + /// so the merged subscription is observable). + private sealed class SubscribingDriverFactory : IDriverFactory + { + private readonly string _supportedType; + private readonly SubscribableStubDriver _driver = new(); + public SubscribingDriverFactory(string supportedType) { _supportedType = supportedType; } + + /// The reference set passed to the driver's most recent SubscribeAsync call. + public IReadOnlyList? LastSubscribedRefs => _driver.LastSubscribedRefs; + + /// + public IDriver? TryCreate(string driverType, string driverInstanceId, string driverConfigJson) => + string.Equals(driverType, _supportedType, StringComparison.Ordinal) ? _driver : null; + + /// + public IReadOnlyCollection SupportedTypes => new[] { _supportedType }; + } +}