feat(otopcua): re-inject discovered nodes after address-space rebuild
This commit is contained in:
@@ -1243,6 +1243,40 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
// reports its FixedTree. Set here (not in ApplyAndAck) so both the fresh-apply and bootstrap-restore
|
// reports its FixedTree. Set here (not in ApplyAndAck) so both the fresh-apply and bootstrap-restore
|
||||||
// paths — which both route through this method — leave a current composition.
|
// paths — which both route through this method — leave a current composition.
|
||||||
_lastComposition = composition;
|
_lastComposition = composition;
|
||||||
|
|
||||||
|
// Re-inject discovered (FixedTree) nodes after the authored rebuild. PushDesiredSubscriptions cleared
|
||||||
|
// _nodeIdByDriverRef and re-pushed authored-only subscriptions above; without this, every redeploy /
|
||||||
|
// bootstrap-restore would drop the injected FixedTree routes + materialised nodes until the driver
|
||||||
|
// happens to reconnect and re-discover. Re-resolve each cached driver's equipment from the CURRENT
|
||||||
|
// composition; drop the cache entry if the driver/equipment no longer resolves to exactly one (a rebind
|
||||||
|
// or removal — the driver's next reconnect re-discovery will rebuild it cleanly).
|
||||||
|
foreach (var driverId in _discoveredByDriver.Keys.ToList()) // snapshot — we mutate the dict below
|
||||||
|
{
|
||||||
|
var plan = _discoveredByDriver[driverId];
|
||||||
|
var equipmentIds = composition.EquipmentTags
|
||||||
|
.Where(t => string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal))
|
||||||
|
.Select(t => t.EquipmentId)
|
||||||
|
.Distinct(StringComparer.Ordinal)
|
||||||
|
.ToList();
|
||||||
|
if (equipmentIds.Count != 1)
|
||||||
|
{
|
||||||
|
_discoveredByDriver.Remove(driverId);
|
||||||
|
_log.Debug("DriverHost {Node}: dropped cached discovered nodes for {Driver} — equipment no longer resolves uniquely", _localNode, driverId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
var equipmentId = equipmentIds[0];
|
||||||
|
// If the equipment was rebound (the cached plan's NodeIds are scoped to the OLD equipment), drop +
|
||||||
|
// let re-discovery rebuild against the new equipment. The plan's NodeIds are "{equipmentId}/...".
|
||||||
|
var planEquipmentConsistent = plan.Variables.Count > 0
|
||||||
|
&& plan.Variables[0].NodeId.StartsWith(equipmentId + "/", StringComparison.Ordinal);
|
||||||
|
if (!planEquipmentConsistent)
|
||||||
|
{
|
||||||
|
_discoveredByDriver.Remove(driverId);
|
||||||
|
_log.Debug("DriverHost {Node}: dropped cached discovered nodes for {Driver} — equipment rebound", _localNode, driverId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
ApplyDiscoveredPlan(driverId, equipmentId, plan);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void SpawnChild(DriverInstanceSpec spec)
|
private void SpawnChild(DriverInstanceSpec spec)
|
||||||
|
|||||||
+114
-5
@@ -44,6 +44,7 @@ public sealed class DriverHostActorDiscoveryTests : RuntimeActorTestBase
|
|||||||
{
|
{
|
||||||
private static readonly NodeId TestNode = NodeId.Parse("driver-disc-test");
|
private static readonly NodeId TestNode = NodeId.Parse("driver-disc-test");
|
||||||
private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64));
|
private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64));
|
||||||
|
private static readonly RevisionHash RevB = RevisionHash.Parse(new string('b', 64));
|
||||||
private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5);
|
private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5);
|
||||||
private static readonly DateTime Ts = new(2026, 6, 26, 10, 0, 0, DateTimeKind.Utc);
|
private static readonly DateTime Ts = new(2026, 6, 26, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
|
||||||
@@ -63,7 +64,7 @@ public sealed class DriverHostActorDiscoveryTests : RuntimeActorTestBase
|
|||||||
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
||||||
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
|
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
|
||||||
|
|
||||||
var (actor, publish) = SpawnHostAndApply(db, deploymentId, factory);
|
var (actor, publish, _) = SpawnHostAndApply(db, deploymentId, factory);
|
||||||
|
|
||||||
// A FixedTree discovered node whose FullReference DIFFERS from the authored tag's FullName, so the
|
// A FixedTree discovered node whose FullReference DIFFERS from the authored tag's FullName, so the
|
||||||
// mapper keeps it (it does not shadow an authored ref).
|
// mapper keeps it (it does not shadow an authored ref).
|
||||||
@@ -155,7 +156,7 @@ public sealed class DriverHostActorDiscoveryTests : RuntimeActorTestBase
|
|||||||
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
||||||
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
|
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
|
||||||
|
|
||||||
var (actor, publish) = SpawnHostAndApply(db, deploymentId, factory);
|
var (actor, publish, _) = SpawnHostAndApply(db, deploymentId, factory);
|
||||||
|
|
||||||
// Two captured nodes: one SHADOWS the authored ref "40001" (must be dropped), one is genuinely new.
|
// Two captured nodes: one SHADOWS the authored ref "40001" (must be dropped), one is genuinely new.
|
||||||
actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", new[]
|
actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", new[]
|
||||||
@@ -190,7 +191,7 @@ public sealed class DriverHostActorDiscoveryTests : RuntimeActorTestBase
|
|||||||
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
||||||
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
|
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
|
||||||
|
|
||||||
var (actor, publish) = SpawnHostAndApply(db, deploymentId, factory);
|
var (actor, publish, _) = SpawnHostAndApply(db, deploymentId, factory);
|
||||||
|
|
||||||
var node1 = new DiscoveredNode(
|
var node1 = new DiscoveredNode(
|
||||||
FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" },
|
FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" },
|
||||||
@@ -232,13 +233,121 @@ public sealed class DriverHostActorDiscoveryTests : RuntimeActorTestBase
|
|||||||
}, duration: Timeout);
|
}, duration: Timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Task 8 survival: discovered (FixedTree) nodes injected after the first apply must SURVIVE a
|
||||||
|
/// redeploy. A second deployment re-runs <c>PushDesiredSubscriptions</c>, which clears the live-value
|
||||||
|
/// routing maps and re-pushes an authored-only subscription set; without the tail re-apply the injected
|
||||||
|
/// FixedTree routes + materialised nodes would be lost until the driver reconnects. Asserts the cached
|
||||||
|
/// plan is (a) re-materialised under EQ-1 after the rebuild, (b) still present in the child's
|
||||||
|
/// post-redeploy subscription, and (c) still routes a published value to its mapped NodeId.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Discovered_nodes_survive_a_redeploy_rebuild()
|
||||||
|
{
|
||||||
|
var db = NewInMemoryDbFactory();
|
||||||
|
var factory = new SubscribingDriverFactory("Modbus");
|
||||||
|
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
||||||
|
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
|
||||||
|
|
||||||
|
var (actor, publish, coordinator) = SpawnHostAndApply(db, deploymentId, factory);
|
||||||
|
|
||||||
|
var discovered = new[]
|
||||||
|
{
|
||||||
|
new DiscoveredNode(
|
||||||
|
FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" },
|
||||||
|
BrowseName: "Model", DisplayName: "Model", FullReference: "ft-ref-1",
|
||||||
|
DataType: DriverDataType.Float64, IsArray: false, ArrayDim: null,
|
||||||
|
Writable: false, IsHistorized: false),
|
||||||
|
};
|
||||||
|
actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", discovered));
|
||||||
|
|
||||||
|
// First injection: materialise #1 under EQ-1 — capture the mapped NodeId for the survival asserts.
|
||||||
|
var materialise1 = publish.ExpectMsg<OpcUaPublishActor.MaterialiseDiscoveredNodes>(Timeout);
|
||||||
|
materialise1.EquipmentRootNodeId.ShouldBe("EQ-1");
|
||||||
|
materialise1.Variables.Count.ShouldBe(1);
|
||||||
|
var fixedTreeNodeId = materialise1.Variables[0].NodeId;
|
||||||
|
|
||||||
|
// Apply a SECOND deployment (new revision, SAME d1 → EQ-1 binding so HandleDispatchFromSteady doesn't
|
||||||
|
// short-circuit on an identical rev). This re-runs PushDesiredSubscriptions, which clears + rebuilds
|
||||||
|
// the routing maps and re-pushes the authored-only subscription set — the exact path Task 8 self-heals.
|
||||||
|
var deploymentId2 = SeedDeploymentWithEquipmentTags(db, RevB,
|
||||||
|
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
|
||||||
|
actor.Tell(new DispatchDeployment(deploymentId2, RevB, CorrelationId.NewId()));
|
||||||
|
coordinator.ExpectMsg<ApplyAck>(Timeout).Outcome.ShouldBe(ApplyAckOutcome.Applied);
|
||||||
|
|
||||||
|
// The redeploy fires a fresh RebuildAddressSpace first (drain it) ...
|
||||||
|
publish.ExpectMsg<OpcUaPublishActor.RebuildAddressSpace>(Timeout);
|
||||||
|
|
||||||
|
// (a) ... then the cached discovered plan is RE-MATERIALISED under EQ-1 (the Task-8 tail re-apply),
|
||||||
|
// at the SAME NodeId the first injection placed it.
|
||||||
|
var materialise2 = publish.ExpectMsg<OpcUaPublishActor.MaterialiseDiscoveredNodes>(Timeout);
|
||||||
|
materialise2.EquipmentRootNodeId.ShouldBe("EQ-1");
|
||||||
|
materialise2.Variables.Count.ShouldBe(1);
|
||||||
|
materialise2.Variables[0].NodeId.ShouldBe(fixedTreeNodeId);
|
||||||
|
|
||||||
|
// (b) The child's post-redeploy subscription STILL carries the FixedTree ref — it was re-merged onto
|
||||||
|
// the freshly-cleared authored-only set, not dropped by the _nodeIdByDriverRef.Clear().
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
var refs = factory.LastSubscribedRefs;
|
||||||
|
refs.ShouldNotBeNull();
|
||||||
|
refs!.ShouldContain("40001");
|
||||||
|
refs.ShouldContain("ft-ref-1");
|
||||||
|
}, duration: Timeout);
|
||||||
|
|
||||||
|
// (c) A value published for the FixedTree ref STILL routes to its mapped NodeId — proving the
|
||||||
|
// live-value routing map was rebuilt by the re-apply (not left empty after the Clear()).
|
||||||
|
actor.Tell(new DriverInstanceActor.AttributeValuePublished("d1", "ft-ref-1", 42.0, OpcUaQuality.Good, Ts));
|
||||||
|
var update = publish.ExpectMsg<OpcUaPublishActor.AttributeValueUpdate>(Timeout);
|
||||||
|
update.NodeId.ShouldBe(fixedTreeNodeId);
|
||||||
|
update.Value.ShouldBe(42.0);
|
||||||
|
update.Quality.ShouldBe(OpcUaQuality.Good);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Task 8 cache-drop: a redeploy whose composition no longer binds the driver to any equipment
|
||||||
|
/// (its authored tags were removed) must DROP the cached discovered plan rather than re-graft it onto a
|
||||||
|
/// stale equipment. After such a redeploy the host re-applies the authored rebuild but does NOT re-tell
|
||||||
|
/// <see cref="OpcUaPublishActor.MaterialiseDiscoveredNodes"/> for the now-unresolved driver.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Discovered_nodes_dropped_when_equipment_no_longer_resolves()
|
||||||
|
{
|
||||||
|
var db = NewInMemoryDbFactory();
|
||||||
|
var factory = new SubscribingDriverFactory("Modbus");
|
||||||
|
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
||||||
|
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
|
||||||
|
|
||||||
|
var (actor, publish, coordinator) = SpawnHostAndApply(db, deploymentId, factory);
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", new[]
|
||||||
|
{
|
||||||
|
new DiscoveredNode(
|
||||||
|
FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" },
|
||||||
|
BrowseName: "Model", DisplayName: "Model", FullReference: "ft-ref-1",
|
||||||
|
DataType: DriverDataType.Float64, IsArray: false, ArrayDim: null,
|
||||||
|
Writable: false, IsHistorized: false),
|
||||||
|
}));
|
||||||
|
|
||||||
|
// First injection materialises under EQ-1.
|
||||||
|
publish.ExpectMsg<OpcUaPublishActor.MaterialiseDiscoveredNodes>(Timeout);
|
||||||
|
|
||||||
|
// Apply a SECOND deployment that binds a DIFFERENT driver (d2 → EQ-2) and carries NO authored tags for
|
||||||
|
// d1, so d1's equipment can no longer be resolved (equipmentIds.Count == 0) — the cache entry is dropped.
|
||||||
|
var deploymentId2 = SeedDeploymentWithEquipmentTags(db, RevB,
|
||||||
|
(Equip: "EQ-2", Driver: "d2", FullName: "40002", Folder: (string?)null, Name: "speed2"));
|
||||||
|
actor.Tell(new DispatchDeployment(deploymentId2, RevB, CorrelationId.NewId()));
|
||||||
|
coordinator.ExpectMsg<ApplyAck>(Timeout).Outcome.ShouldBe(ApplyAckOutcome.Applied);
|
||||||
|
|
||||||
|
// The redeploy fires a fresh RebuildAddressSpace; after draining it, NO MaterialiseDiscoveredNodes is
|
||||||
|
// re-told (the cached d1 plan was dropped because its equipment no longer resolves).
|
||||||
|
publish.ExpectMsg<OpcUaPublishActor.RebuildAddressSpace>(Timeout);
|
||||||
|
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Spawns the host with the subscribing driver factory + a publish probe, dispatches the
|
/// <summary>Spawns the host with the subscribing driver factory + a publish probe, dispatches the
|
||||||
/// deployment, and waits for the Applied ACK so the apply (and thus <c>_lastComposition</c> + the live
|
/// deployment, and waits for the Applied ACK so the apply (and thus <c>_lastComposition</c> + the live
|
||||||
/// child + the initial SubscribeBulk pass) has completed before the test injects discovered nodes. A
|
/// child + the initial SubscribeBulk pass) has completed before the test injects discovered nodes. A
|
||||||
/// VirtualTag-host probe is injected so the real host isn't spawned. The <see cref="OpcUaPublishActor.RebuildAddressSpace"/>
|
/// VirtualTag-host probe is injected so the real host isn't spawned. The <see cref="OpcUaPublishActor.RebuildAddressSpace"/>
|
||||||
/// that lands on the publish probe during apply is drained so the test's materialise / value-update
|
/// that lands on the publish probe during apply is drained so the test's materialise / value-update
|
||||||
/// assertions see only post-apply traffic.</summary>
|
/// assertions see only post-apply traffic.</summary>
|
||||||
private (IActorRef Actor, Akka.TestKit.TestProbe Publish) SpawnHostAndApply(
|
private (IActorRef Actor, Akka.TestKit.TestProbe Publish, Akka.TestKit.TestProbe Coordinator) SpawnHostAndApply(
|
||||||
IDbContextFactory<OtOpcUaConfigDbContext> db, DeploymentId deploymentId, IDriverFactory factory)
|
IDbContextFactory<OtOpcUaConfigDbContext> db, DeploymentId deploymentId, IDriverFactory factory)
|
||||||
{
|
{
|
||||||
var coordinator = CreateTestProbe();
|
var coordinator = CreateTestProbe();
|
||||||
@@ -257,7 +366,7 @@ public sealed class DriverHostActorDiscoveryTests : RuntimeActorTestBase
|
|||||||
|
|
||||||
publish.ExpectMsg<OpcUaPublishActor.RebuildAddressSpace>(Timeout);
|
publish.ExpectMsg<OpcUaPublishActor.RebuildAddressSpace>(Timeout);
|
||||||
|
|
||||||
return (actor, publish);
|
return (actor, publish, coordinator);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
Reference in New Issue
Block a user