feat(runtime): route driver values to folder-scoped equipment NodeIds (live-value delivery)
v2-ci / build (push) Failing after 44s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
v2-ci / build (push) Failing after 44s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
This commit is contained in:
@@ -11,6 +11,7 @@ using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
|||||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
||||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
||||||
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
|
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
@@ -85,6 +86,16 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
|
|
||||||
private readonly Dictionary<string, ChildEntry> _children = new(StringComparer.Ordinal);
|
private readonly Dictionary<string, ChildEntry> _children = new(StringComparer.Ordinal);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Driver live-value routing map: <c>(DriverInstanceId, FullName) → folder-scoped equipment
|
||||||
|
/// NodeId(s)</c>. Rebuilt every apply by <see cref="PushDesiredSubscriptions"/> from the
|
||||||
|
/// composition's <c>EquipmentTags</c> (mirroring <c>VirtualTagHostActor._nodeIdByVtag</c>), and
|
||||||
|
/// resolved in <see cref="ForwardToMux"/> so a driver value published by wire-ref FullName lands
|
||||||
|
/// on the variable's actual folder-scoped NodeId. A list because the same driver ref can back
|
||||||
|
/// several equipment variables (e.g. identical machines sharing a register).
|
||||||
|
/// </summary>
|
||||||
|
private readonly Dictionary<(string DriverInstanceId, string FullName), List<string>> _nodeIdByDriverRef = new();
|
||||||
|
|
||||||
private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed)
|
private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed)
|
||||||
{
|
{
|
||||||
// Convenience accessors for sites that don't need the full spec.
|
// Convenience accessors for sites that don't need the full spec.
|
||||||
@@ -378,18 +389,32 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
|
|
||||||
private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg)
|
private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg)
|
||||||
{
|
{
|
||||||
// Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs).
|
// Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs,
|
||||||
// Without a mux, VirtualTagActor evaluation can't fire — that's the dev/Mac path (no virtual
|
// keyed by FullReference). Without a mux, VirtualTagActor evaluation can't fire — that's the
|
||||||
// tags registered); production binds the mux via the RuntimeActors extension.
|
// dev/Mac path (no virtual tags registered); production binds the mux via the RuntimeActors
|
||||||
|
// extension. KEEP this unchanged — VirtualTag inputs are still keyed by FullReference.
|
||||||
_dependencyMux?.Tell(msg);
|
_dependencyMux?.Tell(msg);
|
||||||
|
|
||||||
// Also push the value to the OPC UA sink. NOTE: equipment-tag variables are materialised with
|
if (_opcUaPublishActor is null) return;
|
||||||
// folder-scoped NodeIds (EquipmentId/FolderPath/Name), while the driver publishes keyed by
|
|
||||||
// FullReference (the tag's FullName). The value therefore only lands on the variable once the
|
// Route the value to the OPC UA sink at the variable's ACTUAL NodeId. Equipment-tag variables
|
||||||
// FullName→NodeId routing — the equipment-tag "live values" milestone — is wired; until then
|
// are materialised with folder-scoped NodeIds (EquipmentId/FolderPath/Name), while the driver
|
||||||
// the variable stays BadWaitingForInitialData.
|
// publishes keyed by its wire-ref FullName (FullReference). The _nodeIdByDriverRef map — built
|
||||||
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate(
|
// each apply from the composition's EquipmentTags — resolves (DriverInstanceId, FullName) to
|
||||||
msg.FullReference, msg.Value, msg.Quality, msg.TimestampUtc));
|
// the folder-scoped NodeId(s) the materialiser placed the variable(s) at, so the value lands
|
||||||
|
// instead of leaving the variable at BadWaitingForInitialData. One driver ref can back several
|
||||||
|
// equipment variables (identical machines sharing a register), hence the fan-out.
|
||||||
|
if (_nodeIdByDriverRef.TryGetValue((msg.DriverInstanceId, msg.FullReference), out var nodeIds))
|
||||||
|
{
|
||||||
|
foreach (var nodeId in nodeIds)
|
||||||
|
_opcUaPublishActor.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate(
|
||||||
|
nodeId, msg.Value, msg.Quality, msg.TimestampUtc));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_log.Debug("DriverHost {Node}: no equipment-tag NodeId for ({Driver},{Ref}) — value dropped",
|
||||||
|
_localNode, msg.DriverInstanceId, msg.FullReference);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void Stale()
|
private void Stale()
|
||||||
@@ -592,6 +617,21 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
.ToArray(),
|
.ToArray(),
|
||||||
StringComparer.Ordinal);
|
StringComparer.Ordinal);
|
||||||
|
|
||||||
|
// Rebuild the driver live-value routing map from the SAME EquipmentTags pass (mirrors
|
||||||
|
// VirtualTagHostActor._nodeIdByVtag): map each tag's (DriverInstanceId, FullName) wire-ref to
|
||||||
|
// the folder-scoped equipment NodeId the materialiser placed its variable at, so ForwardToMux
|
||||||
|
// can land driver values on the right node. Clear-and-repopulate every apply so renames
|
||||||
|
// (Name/FolderPath/EquipmentId changes) and removals are reflected.
|
||||||
|
_nodeIdByDriverRef.Clear();
|
||||||
|
foreach (var t in composition.EquipmentTags)
|
||||||
|
{
|
||||||
|
var key = (t.DriverInstanceId, t.FullName);
|
||||||
|
var nodeId = EquipmentNodeIds.Variable(t.EquipmentId, t.FolderPath, t.Name);
|
||||||
|
if (!_nodeIdByDriverRef.TryGetValue(key, out var list))
|
||||||
|
_nodeIdByDriverRef[key] = list = new List<string>();
|
||||||
|
if (!list.Contains(nodeId)) list.Add(nodeId);
|
||||||
|
}
|
||||||
|
|
||||||
var total = 0;
|
var total = 0;
|
||||||
foreach (var (driverId, entry) in _children)
|
foreach (var (driverId, entry) in _children)
|
||||||
{
|
{
|
||||||
|
|||||||
+194
@@ -0,0 +1,194 @@
|
|||||||
|
using System.Text.Json;
|
||||||
|
using Akka.Actor;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Verifies the equipment-tag <b>live-value routing</b> wired into <see cref="DriverHostActor"/>:
|
||||||
|
/// a driver publishes a value keyed by its wire-ref <c>FullName</c>, but the equipment variable was
|
||||||
|
/// materialised under a FOLDER-SCOPED NodeId (<c>{equipmentId}/{folderPath}/{name}</c>). After an
|
||||||
|
/// apply, the host's <c>_nodeIdByDriverRef</c> map resolves <c>(DriverInstanceId, FullName)</c> to
|
||||||
|
/// that folder-scoped NodeId, so <c>ForwardToMux</c> lands the value on the right node (and still
|
||||||
|
/// forwards the raw value to the dependency mux for VirtualTag inputs).
|
||||||
|
///
|
||||||
|
/// <para>
|
||||||
|
/// Drives a real apply through the existing harness: the seeded artifact carries the
|
||||||
|
/// <c>Namespaces</c> / <c>DriverInstances</c> / <c>Tags</c> arrays that
|
||||||
|
/// <c>DeploymentArtifact.BuildEquipmentTagPlans</c> needs to project equipment tags, and a
|
||||||
|
/// <c>DispatchDeployment</c> triggers the <c>ApplyAndAck → PushDesiredSubscriptions</c> pass
|
||||||
|
/// that builds the map. The OPC UA sink and the dependency mux are injected as
|
||||||
|
/// <see cref="Akka.TestKit.TestProbe"/>s.
|
||||||
|
/// </para>
|
||||||
|
/// </summary>
|
||||||
|
public sealed class DriverHostActorLiveValueTests : RuntimeActorTestBase
|
||||||
|
{
|
||||||
|
private static readonly NodeId TestNode = NodeId.Parse("driver-lv-test");
|
||||||
|
private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64));
|
||||||
|
private static readonly DateTime Ts = new(2026, 6, 13, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
|
||||||
|
/// <summary>A driver value published by FullName lands on the equipment variable's folder-scoped
|
||||||
|
/// NodeId (here <c>eq-1/speed</c>, no sub-folder), carrying the value/quality/timestamp.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Driver_value_routes_to_folder_scoped_equipment_NodeId()
|
||||||
|
{
|
||||||
|
var db = NewInMemoryDbFactory();
|
||||||
|
// One equipment tag: eq-1, drv-1, FullName "40001", no folder, Name "speed".
|
||||||
|
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
||||||
|
(Equip: "eq-1", Driver: "drv-1", FullName: "40001", Folder: null, Name: "speed"));
|
||||||
|
|
||||||
|
var (actor, publish, mux) = SpawnHostAndApply(db, deploymentId);
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.AttributeValuePublished(
|
||||||
|
"drv-1", "40001", 42.0, OpcUaQuality.Good, Ts));
|
||||||
|
|
||||||
|
var update = publish.ExpectMsg<OpcUaPublishActor.AttributeValueUpdate>(TimeSpan.FromSeconds(5));
|
||||||
|
update.NodeId.ShouldBe("eq-1/speed");
|
||||||
|
update.Value.ShouldBe(42.0);
|
||||||
|
update.Quality.ShouldBe(OpcUaQuality.Good);
|
||||||
|
update.TimestampUtc.ShouldBe(Ts);
|
||||||
|
|
||||||
|
// The raw value is still forwarded to the dependency mux (VirtualTag inputs, keyed by FullName).
|
||||||
|
mux.ExpectMsg<DriverInstanceActor.AttributeValuePublished>(TimeSpan.FromSeconds(5))
|
||||||
|
.FullReference.ShouldBe("40001");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>The same driver ref backing two equipments fans out: a single publish produces one
|
||||||
|
/// <see cref="OpcUaPublishActor.AttributeValueUpdate"/> per equipment variable NodeId.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Same_ref_on_two_equipments_fans_out_to_both_NodeIds()
|
||||||
|
{
|
||||||
|
var db = NewInMemoryDbFactory();
|
||||||
|
// Same (drv-1, "40001") wire-ref backs eq-1/speed AND eq-2/speed (identical machines).
|
||||||
|
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
||||||
|
(Equip: "eq-1", Driver: "drv-1", FullName: "40001", Folder: null, Name: "speed"),
|
||||||
|
(Equip: "eq-2", Driver: "drv-1", FullName: "40001", Folder: null, Name: "speed"));
|
||||||
|
|
||||||
|
var (actor, publish, _) = SpawnHostAndApply(db, deploymentId);
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.AttributeValuePublished(
|
||||||
|
"drv-1", "40001", 7.0, OpcUaQuality.Good, Ts));
|
||||||
|
|
||||||
|
// One publish → two updates. Assert the SET of NodeIds (order is not contractual).
|
||||||
|
var first = publish.ExpectMsg<OpcUaPublishActor.AttributeValueUpdate>(TimeSpan.FromSeconds(5));
|
||||||
|
var second = publish.ExpectMsg<OpcUaPublishActor.AttributeValueUpdate>(TimeSpan.FromSeconds(5));
|
||||||
|
new[] { first.NodeId, second.NodeId }.ShouldBe(
|
||||||
|
new[] { "eq-1/speed", "eq-2/speed" }, ignoreOrder: true);
|
||||||
|
first.Value.ShouldBe(7.0);
|
||||||
|
second.Value.ShouldBe(7.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>A value for a ref not in the composition is NOT pushed to the OPC UA sink (no matching
|
||||||
|
/// NodeId), but is still forwarded to the dependency mux.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Unmatched_ref_is_dropped_from_sink_but_still_forwarded_to_mux()
|
||||||
|
{
|
||||||
|
var db = NewInMemoryDbFactory();
|
||||||
|
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
||||||
|
(Equip: "eq-1", Driver: "drv-1", FullName: "40001", Folder: null, Name: "speed"));
|
||||||
|
|
||||||
|
var (actor, publish, mux) = SpawnHostAndApply(db, deploymentId);
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.AttributeValuePublished(
|
||||||
|
"drv-1", "59999", 99.0, OpcUaQuality.Good, Ts));
|
||||||
|
|
||||||
|
// No equipment-tag NodeId for ("drv-1","59999") → nothing reaches the sink.
|
||||||
|
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||||
|
// The raw publish still reaches the dependency mux.
|
||||||
|
mux.ExpectMsg<DriverInstanceActor.AttributeValuePublished>(TimeSpan.FromSeconds(5))
|
||||||
|
.FullReference.ShouldBe("59999");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Spawns the host with publish + mux probes, dispatches the deployment, and waits for the
|
||||||
|
/// Applied ACK so the apply (and thus the map build in PushDesiredSubscriptions) has completed
|
||||||
|
/// before the test publishes a value. A VirtualTag-host probe is injected so the real host isn't
|
||||||
|
/// spawned (it would consume the mux-forwarded publishes otherwise — see SpawnVirtualTagHost).</summary>
|
||||||
|
private (IActorRef Actor, Akka.TestKit.TestProbe Publish, Akka.TestKit.TestProbe Mux) SpawnHostAndApply(
|
||||||
|
IDbContextFactory<OtOpcUaConfigDbContext> db, DeploymentId deploymentId)
|
||||||
|
{
|
||||||
|
var coordinator = CreateTestProbe();
|
||||||
|
var publish = CreateTestProbe();
|
||||||
|
var mux = CreateTestProbe();
|
||||||
|
var vtHost = CreateTestProbe();
|
||||||
|
|
||||||
|
var actor = Sys.ActorOf(DriverHostActor.Props(
|
||||||
|
db, TestNode, coordinator.Ref,
|
||||||
|
localRoles: new HashSet<string> { "driver" },
|
||||||
|
dependencyMux: mux.Ref,
|
||||||
|
opcUaPublishActor: publish.Ref,
|
||||||
|
virtualTagEvaluator: NullVirtualTagEvaluator.Instance,
|
||||||
|
virtualTagHostOverride: vtHost.Ref));
|
||||||
|
|
||||||
|
actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId()));
|
||||||
|
coordinator.ExpectMsg<ApplyAck>(TimeSpan.FromSeconds(5)).Outcome.ShouldBe(ApplyAckOutcome.Applied);
|
||||||
|
|
||||||
|
// RebuildAddressSpace also lands on the publish probe during apply; drain it so the test's
|
||||||
|
// ExpectMsg<AttributeValueUpdate> assertions see only value updates.
|
||||||
|
publish.ExpectMsg<OpcUaPublishActor.RebuildAddressSpace>(TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
return (actor, publish, mux);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Seeds a Sealed deployment whose artifact carries the minimal arrays
|
||||||
|
/// <c>DeploymentArtifact.BuildEquipmentTagPlans</c> needs to project equipment tags:
|
||||||
|
/// <c>Namespaces</c> (one Equipment-kind ns), <c>DriverInstances</c> (each driver bound to that
|
||||||
|
/// ns), and <c>Tags</c> (each with a non-null EquipmentId + a TagConfig blob carrying FullName).
|
||||||
|
/// </summary>
|
||||||
|
private static DeploymentId SeedDeploymentWithEquipmentTags(
|
||||||
|
IDbContextFactory<OtOpcUaConfigDbContext> db, RevisionHash rev,
|
||||||
|
params (string Equip, string Driver, string FullName, string? Folder, string Name)[] tags)
|
||||||
|
{
|
||||||
|
var driverIds = tags.Select(t => t.Driver).Distinct(StringComparer.Ordinal).ToArray();
|
||||||
|
|
||||||
|
var artifact = JsonSerializer.SerializeToUtf8Bytes(new
|
||||||
|
{
|
||||||
|
Namespaces = new[]
|
||||||
|
{
|
||||||
|
new { NamespaceId = "ns-eq", Kind = 0 }, // NamespaceKind.Equipment = 0
|
||||||
|
},
|
||||||
|
DriverInstances = driverIds.Select(d => new
|
||||||
|
{
|
||||||
|
DriverInstanceId = d,
|
||||||
|
NamespaceId = "ns-eq",
|
||||||
|
}).ToArray(),
|
||||||
|
Tags = tags.Select((t, i) => new
|
||||||
|
{
|
||||||
|
TagId = $"tag-{i}",
|
||||||
|
EquipmentId = t.Equip,
|
||||||
|
DriverInstanceId = t.Driver,
|
||||||
|
Name = t.Name,
|
||||||
|
FolderPath = t.Folder,
|
||||||
|
DataType = "Double",
|
||||||
|
TagConfig = JsonSerializer.Serialize(new { FullName = t.FullName }),
|
||||||
|
}).ToArray(),
|
||||||
|
});
|
||||||
|
|
||||||
|
var id = DeploymentId.NewId();
|
||||||
|
using var ctx = db.CreateDbContext();
|
||||||
|
ctx.Deployments.Add(new Deployment
|
||||||
|
{
|
||||||
|
DeploymentId = id.Value,
|
||||||
|
RevisionHash = rev.Value,
|
||||||
|
Status = DeploymentStatus.Sealed,
|
||||||
|
CreatedBy = "test",
|
||||||
|
SealedAtUtc = DateTime.UtcNow,
|
||||||
|
ArtifactBlob = artifact,
|
||||||
|
});
|
||||||
|
ctx.SaveChanges();
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user