From c4435e4fd6fa7f882ae356d3047bcaa07e317bba Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 13 Jun 2026 06:32:38 -0400 Subject: [PATCH] feat(runtime): route driver values to folder-scoped equipment NodeIds (live-value delivery) --- .../Drivers/DriverHostActor.cs | 60 +++++- .../Drivers/DriverHostActorLiveValueTests.cs | 194 ++++++++++++++++++ 2 files changed, 244 insertions(+), 10 deletions(-) create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorLiveValueTests.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index 5dd42a03..b8b70427 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -11,6 +11,7 @@ using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; using ZB.MOM.WW.OtOpcUa.Commons.Observability; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; @@ -85,6 +86,16 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers private readonly Dictionary _children = new(StringComparer.Ordinal); + /// + /// Driver live-value routing map: (DriverInstanceId, FullName) → folder-scoped equipment + /// NodeId(s). Rebuilt every apply by from the + /// composition's EquipmentTags (mirroring VirtualTagHostActor._nodeIdByVtag), and + /// resolved in so a driver value published by wire-ref FullName lands + /// on the variable's actual folder-scoped NodeId. A list because the same driver ref can back + /// several equipment variables (e.g. identical machines sharing a register). + /// + private readonly Dictionary<(string DriverInstanceId, string FullName), List> _nodeIdByDriverRef = new(); + private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed) { // Convenience accessors for sites that don't need the full spec. @@ -378,18 +389,32 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg) { - // Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs). - // Without a mux, VirtualTagActor evaluation can't fire — that's the dev/Mac path (no virtual - // tags registered); production binds the mux via the RuntimeActors extension. + // Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs, + // keyed by FullReference). Without a mux, VirtualTagActor evaluation can't fire — that's the + // dev/Mac path (no virtual tags registered); production binds the mux via the RuntimeActors + // extension. KEEP this unchanged — VirtualTag inputs are still keyed by FullReference. _dependencyMux?.Tell(msg); - // Also push the value to the OPC UA sink. NOTE: equipment-tag variables are materialised with - // folder-scoped NodeIds (EquipmentId/FolderPath/Name), while the driver publishes keyed by - // FullReference (the tag's FullName). The value therefore only lands on the variable once the - // FullName→NodeId routing — the equipment-tag "live values" milestone — is wired; until then - // the variable stays BadWaitingForInitialData. - _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate( - msg.FullReference, msg.Value, msg.Quality, msg.TimestampUtc)); + if (_opcUaPublishActor is null) return; + + // Route the value to the OPC UA sink at the variable's ACTUAL NodeId. Equipment-tag variables + // are materialised with folder-scoped NodeIds (EquipmentId/FolderPath/Name), while the driver + // publishes keyed by its wire-ref FullName (FullReference). The _nodeIdByDriverRef map — built + // each apply from the composition's EquipmentTags — resolves (DriverInstanceId, FullName) to + // the folder-scoped NodeId(s) the materialiser placed the variable(s) at, so the value lands + // instead of leaving the variable at BadWaitingForInitialData. One driver ref can back several + // equipment variables (identical machines sharing a register), hence the fan-out. + if (_nodeIdByDriverRef.TryGetValue((msg.DriverInstanceId, msg.FullReference), out var nodeIds)) + { + foreach (var nodeId in nodeIds) + _opcUaPublishActor.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate( + nodeId, msg.Value, msg.Quality, msg.TimestampUtc)); + } + else + { + _log.Debug("DriverHost {Node}: no equipment-tag NodeId for ({Driver},{Ref}) — value dropped", + _localNode, msg.DriverInstanceId, msg.FullReference); + } } private void Stale() @@ -592,6 +617,21 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers .ToArray(), StringComparer.Ordinal); + // Rebuild the driver live-value routing map from the SAME EquipmentTags pass (mirrors + // VirtualTagHostActor._nodeIdByVtag): map each tag's (DriverInstanceId, FullName) wire-ref to + // the folder-scoped equipment NodeId the materialiser placed its variable at, so ForwardToMux + // can land driver values on the right node. Clear-and-repopulate every apply so renames + // (Name/FolderPath/EquipmentId changes) and removals are reflected. + _nodeIdByDriverRef.Clear(); + foreach (var t in composition.EquipmentTags) + { + var key = (t.DriverInstanceId, t.FullName); + var nodeId = EquipmentNodeIds.Variable(t.EquipmentId, t.FolderPath, t.Name); + if (!_nodeIdByDriverRef.TryGetValue(key, out var list)) + _nodeIdByDriverRef[key] = list = new List(); + if (!list.Contains(nodeId)) list.Add(nodeId); + } + var total = 0; foreach (var (driverId, entry) in _children) { diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorLiveValueTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorLiveValueTests.cs new file mode 100644 index 00000000..919e0313 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorLiveValueTests.cs @@ -0,0 +1,194 @@ +using System.Text.Json; +using Akka.Actor; +using Microsoft.EntityFrameworkCore; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; + +/// +/// Verifies the equipment-tag live-value routing wired into : +/// a driver publishes a value keyed by its wire-ref FullName, but the equipment variable was +/// materialised under a FOLDER-SCOPED NodeId ({equipmentId}/{folderPath}/{name}). After an +/// apply, the host's _nodeIdByDriverRef map resolves (DriverInstanceId, FullName) to +/// that folder-scoped NodeId, so ForwardToMux lands the value on the right node (and still +/// forwards the raw value to the dependency mux for VirtualTag inputs). +/// +/// +/// Drives a real apply through the existing harness: the seeded artifact carries the +/// Namespaces / DriverInstances / Tags arrays that +/// DeploymentArtifact.BuildEquipmentTagPlans needs to project equipment tags, and a +/// DispatchDeployment triggers the ApplyAndAck → PushDesiredSubscriptions pass +/// that builds the map. The OPC UA sink and the dependency mux are injected as +/// s. +/// +/// +public sealed class DriverHostActorLiveValueTests : RuntimeActorTestBase +{ + private static readonly NodeId TestNode = NodeId.Parse("driver-lv-test"); + private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64)); + private static readonly DateTime Ts = new(2026, 6, 13, 10, 0, 0, DateTimeKind.Utc); + + /// A driver value published by FullName lands on the equipment variable's folder-scoped + /// NodeId (here eq-1/speed, no sub-folder), carrying the value/quality/timestamp. + [Fact] + public void Driver_value_routes_to_folder_scoped_equipment_NodeId() + { + var db = NewInMemoryDbFactory(); + // One equipment tag: eq-1, drv-1, FullName "40001", no folder, Name "speed". + var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA, + (Equip: "eq-1", Driver: "drv-1", FullName: "40001", Folder: null, Name: "speed")); + + var (actor, publish, mux) = SpawnHostAndApply(db, deploymentId); + + actor.Tell(new DriverInstanceActor.AttributeValuePublished( + "drv-1", "40001", 42.0, OpcUaQuality.Good, Ts)); + + var update = publish.ExpectMsg(TimeSpan.FromSeconds(5)); + update.NodeId.ShouldBe("eq-1/speed"); + update.Value.ShouldBe(42.0); + update.Quality.ShouldBe(OpcUaQuality.Good); + update.TimestampUtc.ShouldBe(Ts); + + // The raw value is still forwarded to the dependency mux (VirtualTag inputs, keyed by FullName). + mux.ExpectMsg(TimeSpan.FromSeconds(5)) + .FullReference.ShouldBe("40001"); + } + + /// The same driver ref backing two equipments fans out: a single publish produces one + /// per equipment variable NodeId. + [Fact] + public void Same_ref_on_two_equipments_fans_out_to_both_NodeIds() + { + var db = NewInMemoryDbFactory(); + // Same (drv-1, "40001") wire-ref backs eq-1/speed AND eq-2/speed (identical machines). + var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA, + (Equip: "eq-1", Driver: "drv-1", FullName: "40001", Folder: null, Name: "speed"), + (Equip: "eq-2", Driver: "drv-1", FullName: "40001", Folder: null, Name: "speed")); + + var (actor, publish, _) = SpawnHostAndApply(db, deploymentId); + + actor.Tell(new DriverInstanceActor.AttributeValuePublished( + "drv-1", "40001", 7.0, OpcUaQuality.Good, Ts)); + + // One publish → two updates. Assert the SET of NodeIds (order is not contractual). + var first = publish.ExpectMsg(TimeSpan.FromSeconds(5)); + var second = publish.ExpectMsg(TimeSpan.FromSeconds(5)); + new[] { first.NodeId, second.NodeId }.ShouldBe( + new[] { "eq-1/speed", "eq-2/speed" }, ignoreOrder: true); + first.Value.ShouldBe(7.0); + second.Value.ShouldBe(7.0); + } + + /// A value for a ref not in the composition is NOT pushed to the OPC UA sink (no matching + /// NodeId), but is still forwarded to the dependency mux. + [Fact] + public void Unmatched_ref_is_dropped_from_sink_but_still_forwarded_to_mux() + { + var db = NewInMemoryDbFactory(); + var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA, + (Equip: "eq-1", Driver: "drv-1", FullName: "40001", Folder: null, Name: "speed")); + + var (actor, publish, mux) = SpawnHostAndApply(db, deploymentId); + + actor.Tell(new DriverInstanceActor.AttributeValuePublished( + "drv-1", "59999", 99.0, OpcUaQuality.Good, Ts)); + + // No equipment-tag NodeId for ("drv-1","59999") → nothing reaches the sink. + publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + // The raw publish still reaches the dependency mux. + mux.ExpectMsg(TimeSpan.FromSeconds(5)) + .FullReference.ShouldBe("59999"); + } + + /// Spawns the host with publish + mux probes, dispatches the deployment, and waits for the + /// Applied ACK so the apply (and thus the map build in PushDesiredSubscriptions) has completed + /// before the test publishes a value. A VirtualTag-host probe is injected so the real host isn't + /// spawned (it would consume the mux-forwarded publishes otherwise — see SpawnVirtualTagHost). + private (IActorRef Actor, Akka.TestKit.TestProbe Publish, Akka.TestKit.TestProbe Mux) SpawnHostAndApply( + IDbContextFactory db, DeploymentId deploymentId) + { + var coordinator = CreateTestProbe(); + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var vtHost = CreateTestProbe(); + + var actor = Sys.ActorOf(DriverHostActor.Props( + db, TestNode, coordinator.Ref, + localRoles: new HashSet { "driver" }, + dependencyMux: mux.Ref, + opcUaPublishActor: publish.Ref, + virtualTagEvaluator: NullVirtualTagEvaluator.Instance, + virtualTagHostOverride: vtHost.Ref)); + + actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId())); + coordinator.ExpectMsg(TimeSpan.FromSeconds(5)).Outcome.ShouldBe(ApplyAckOutcome.Applied); + + // RebuildAddressSpace also lands on the publish probe during apply; drain it so the test's + // ExpectMsg assertions see only value updates. + publish.ExpectMsg(TimeSpan.FromSeconds(5)); + + return (actor, publish, mux); + } + + /// + /// Seeds a Sealed deployment whose artifact carries the minimal arrays + /// DeploymentArtifact.BuildEquipmentTagPlans needs to project equipment tags: + /// Namespaces (one Equipment-kind ns), DriverInstances (each driver bound to that + /// ns), and Tags (each with a non-null EquipmentId + a TagConfig blob carrying FullName). + /// + private static DeploymentId SeedDeploymentWithEquipmentTags( + IDbContextFactory db, RevisionHash rev, + params (string Equip, string Driver, string FullName, string? Folder, string Name)[] tags) + { + var driverIds = tags.Select(t => t.Driver).Distinct(StringComparer.Ordinal).ToArray(); + + var artifact = JsonSerializer.SerializeToUtf8Bytes(new + { + Namespaces = new[] + { + new { NamespaceId = "ns-eq", Kind = 0 }, // NamespaceKind.Equipment = 0 + }, + DriverInstances = driverIds.Select(d => new + { + DriverInstanceId = d, + NamespaceId = "ns-eq", + }).ToArray(), + Tags = tags.Select((t, i) => new + { + TagId = $"tag-{i}", + EquipmentId = t.Equip, + DriverInstanceId = t.Driver, + Name = t.Name, + FolderPath = t.Folder, + DataType = "Double", + TagConfig = JsonSerializer.Serialize(new { FullName = t.FullName }), + }).ToArray(), + }); + + var id = DeploymentId.NewId(); + using var ctx = db.CreateDbContext(); + ctx.Deployments.Add(new Deployment + { + DeploymentId = id.Value, + RevisionHash = rev.Value, + Status = DeploymentStatus.Sealed, + CreatedBy = "test", + SealedAtUtc = DateTime.UtcNow, + ArtifactBlob = artifact, + }); + ctx.SaveChanges(); + return id; + } +}