fix(otopcua): skip redundant discovered-node re-apply + guard tests
This commit is contained in:
@@ -620,10 +620,31 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
var plan = DiscoveredNodeMapper.Map(equipmentId, msg.Nodes, authoredRefs);
|
var plan = DiscoveredNodeMapper.Map(equipmentId, msg.Nodes, authoredRefs);
|
||||||
if (plan.Variables.Count == 0) return; // nothing new to inject (all captured nodes were authored)
|
if (plan.Variables.Count == 0) return; // nothing new to inject (all captured nodes were authored)
|
||||||
|
|
||||||
|
// Unchanged-plan short-circuit: Task 6's driver re-discovers every ~2s (up to ~15 passes) until the
|
||||||
|
// FixedTree set stabilises, re-sending DiscoveredNodesReady each pass. Re-applying an IDENTICAL plan
|
||||||
|
// would re-send SetDesiredSubscriptions, forcing the child to UnsubscribeAsync (dropping the WHOLE
|
||||||
|
// handle — authored tags included) then re-Subscribe — blipping authored-tag values up to ~15× across
|
||||||
|
// the discovery window. Skip when the routing is unchanged from the last applied pass; a GROWING set
|
||||||
|
// still differs (superset) and re-applies. This is _discoveredByDriver's first reader.
|
||||||
|
if (_discoveredByDriver.TryGetValue(msg.DriverInstanceId, out var cached)
|
||||||
|
&& RoutingEquals(cached.RoutingByRef, plan.RoutingByRef))
|
||||||
|
{
|
||||||
|
_log.Debug("DriverHost {Node}: discovered set for driver {Driver} unchanged ({Count} node(s)) — re-apply skipped",
|
||||||
|
_localNode, msg.DriverInstanceId, plan.Variables.Count);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
_discoveredByDriver[msg.DriverInstanceId] = plan;
|
_discoveredByDriver[msg.DriverInstanceId] = plan;
|
||||||
ApplyDiscoveredPlan(msg.DriverInstanceId, equipmentId, plan);
|
ApplyDiscoveredPlan(msg.DriverInstanceId, equipmentId, plan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Routing-map equality: same count + every key maps to the same NodeId. Lets
|
||||||
|
/// <see cref="HandleDiscoveredNodes"/> skip re-applying an unchanged discovered set across the driver's
|
||||||
|
/// repeated post-connect re-discovery passes (a grown/changed set differs and re-applies).</summary>
|
||||||
|
private static bool RoutingEquals(IReadOnlyDictionary<string, string> a, IReadOnlyDictionary<string, string> b)
|
||||||
|
=> a.Count == b.Count
|
||||||
|
&& a.All(kv => b.TryGetValue(kv.Key, out var v) && string.Equals(v, kv.Value, StringComparison.Ordinal));
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Grafts a <see cref="DiscoveredInjectionPlan"/> onto the served state: extends the live-value
|
/// Grafts a <see cref="DiscoveredInjectionPlan"/> onto the served state: extends the live-value
|
||||||
/// routing map (mirroring <see cref="PushDesiredSubscriptions"/>' fan-out so <see cref="ForwardToMux"/>
|
/// routing map (mirroring <see cref="PushDesiredSubscriptions"/>' fan-out so <see cref="ForwardToMux"/>
|
||||||
@@ -636,7 +657,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
private void ApplyDiscoveredPlan(string driverId, string equipmentId, DiscoveredInjectionPlan plan)
|
private void ApplyDiscoveredPlan(string driverId, string equipmentId, DiscoveredInjectionPlan plan)
|
||||||
{
|
{
|
||||||
// Extend the live-value routing map (fan-out), mirroring PushDesiredSubscriptions' pattern.
|
// Extend the live-value routing map (fan-out), mirroring PushDesiredSubscriptions' pattern. This is
|
||||||
|
// purely ADDITIVE across passes: a shrinking discovery set would leave the dropped refs' stale routes
|
||||||
|
// until the next full apply (PushDesiredSubscriptions) clears + rebuilds the maps — acceptable because
|
||||||
|
// a FOCAS FixedTree only grows-then-stabilises, never shrinks within a connect.
|
||||||
foreach (var (driverRef, nodeId) in plan.RoutingByRef)
|
foreach (var (driverRef, nodeId) in plan.RoutingByRef)
|
||||||
{
|
{
|
||||||
var key = (driverId, driverRef);
|
var key = (driverId, driverRef);
|
||||||
@@ -654,6 +678,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
// and ForwardToMux routes the values. Recompute the authored value + alarm refs the same way
|
// and ForwardToMux routes the values. Recompute the authored value + alarm refs the same way
|
||||||
// PushDesiredSubscriptions does, then union the FixedTree refs onto the value set.
|
// PushDesiredSubscriptions does, then union the FixedTree refs onto the value set.
|
||||||
if (!_children.TryGetValue(driverId, out var entry)) return;
|
if (!_children.TryGetValue(driverId, out var entry)) return;
|
||||||
|
// The _lastComposition null-guards below are defensive: HandleDiscoveredNodes already proved it
|
||||||
|
// non-null, but Task 8 will also call ApplyDiscoveredPlan from the PushDesiredSubscriptions tail —
|
||||||
|
// keep them so that re-apply path can't NRE.
|
||||||
var authoredValueRefs = _lastComposition is null
|
var authoredValueRefs = _lastComposition is null
|
||||||
? Enumerable.Empty<string>()
|
? Enumerable.Empty<string>()
|
||||||
: _lastComposition.EquipmentTags
|
: _lastComposition.EquipmentTags
|
||||||
@@ -899,6 +926,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
Receive<RouteNativeAlarmAck>(msg =>
|
Receive<RouteNativeAlarmAck>(msg =>
|
||||||
_log.Debug("DriverHost {Node}: dropping native-alarm ack for {Node2} while Stale (config DB unreachable)",
|
_log.Debug("DriverHost {Node}: dropping native-alarm ack for {Node2} while Stale (config DB unreachable)",
|
||||||
_localNode, msg.ConditionNodeId));
|
_localNode, msg.ConditionNodeId));
|
||||||
|
// A driver child's post-connect DiscoveredNodesReady can't be injected while Stale (no composition is
|
||||||
|
// applied yet, so the equipment can't be resolved). Drop it — Task 6's re-discovery loop re-sends it
|
||||||
|
// and the Task-8 post-recovery re-apply self-heal it once an apply runs (matches the no-op drops above).
|
||||||
|
Receive<DriverInstanceActor.DiscoveredNodesReady>(_ => { });
|
||||||
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
||||||
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
|
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
|
||||||
}
|
}
|
||||||
|
|||||||
+124
@@ -112,6 +112,126 @@ public sealed class DriverHostActorDiscoveryTests : RuntimeActorTestBase
|
|||||||
update.TimestampUtc.ShouldBe(Ts);
|
update.TimestampUtc.ShouldBe(Ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Guard: a <see cref="DriverInstanceActor.DiscoveredNodesReady"/> arriving BEFORE any deployment
|
||||||
|
/// is applied (<c>_lastComposition</c> still null) is ignored — nothing is materialised on the publish
|
||||||
|
/// side (the equipment can't be resolved without a composition).</summary>
|
||||||
|
[Fact]
|
||||||
|
public void DiscoveredNodes_before_any_apply_are_ignored()
|
||||||
|
{
|
||||||
|
var db = NewInMemoryDbFactory();
|
||||||
|
var coordinator = CreateTestProbe();
|
||||||
|
var publish = CreateTestProbe();
|
||||||
|
var vtHost = CreateTestProbe();
|
||||||
|
|
||||||
|
// No deployment dispatched ⇒ Bootstrap enters Steady with no composition ⇒ _lastComposition is null.
|
||||||
|
var actor = Sys.ActorOf(DriverHostActor.Props(
|
||||||
|
db, TestNode, coordinator.Ref,
|
||||||
|
driverFactory: new SubscribingDriverFactory("Modbus"),
|
||||||
|
localRoles: new HashSet<string> { "driver" },
|
||||||
|
opcUaPublishActor: publish.Ref,
|
||||||
|
virtualTagHostOverride: vtHost.Ref));
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", new[]
|
||||||
|
{
|
||||||
|
new DiscoveredNode(
|
||||||
|
FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" },
|
||||||
|
BrowseName: "Model", DisplayName: "Model", FullReference: "ft-ref-1",
|
||||||
|
DataType: DriverDataType.Float64, IsArray: false, ArrayDim: null,
|
||||||
|
Writable: false, IsHistorized: false),
|
||||||
|
}));
|
||||||
|
|
||||||
|
// No composition ⇒ no materialise (and no RebuildAddressSpace either, since nothing was applied).
|
||||||
|
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Dedup: a discovered node whose <c>FullReference</c> equals an authored equipment tag's
|
||||||
|
/// FullName is NOT injected (it would shadow the authored node) — only the genuinely-new FixedTree refs
|
||||||
|
/// are materialised.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Discovered_node_shadowing_an_authored_ref_is_not_injected()
|
||||||
|
{
|
||||||
|
var db = NewInMemoryDbFactory();
|
||||||
|
var factory = new SubscribingDriverFactory("Modbus");
|
||||||
|
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
||||||
|
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
|
||||||
|
|
||||||
|
var (actor, publish) = SpawnHostAndApply(db, deploymentId, factory);
|
||||||
|
|
||||||
|
// Two captured nodes: one SHADOWS the authored ref "40001" (must be dropped), one is genuinely new.
|
||||||
|
actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", new[]
|
||||||
|
{
|
||||||
|
new DiscoveredNode(
|
||||||
|
FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Registers" },
|
||||||
|
BrowseName: "speed", DisplayName: "Speed", FullReference: "40001",
|
||||||
|
DataType: DriverDataType.Float64, IsArray: false, ArrayDim: null,
|
||||||
|
Writable: false, IsHistorized: false),
|
||||||
|
new DiscoveredNode(
|
||||||
|
FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" },
|
||||||
|
BrowseName: "Model", DisplayName: "Model", FullReference: "ft-ref-1",
|
||||||
|
DataType: DriverDataType.Float64, IsArray: false, ArrayDim: null,
|
||||||
|
Writable: false, IsHistorized: false),
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Exactly ONE variable materialised — the new "Model", not the authored-shadow "Speed".
|
||||||
|
var materialise = publish.ExpectMsg<OpcUaPublishActor.MaterialiseDiscoveredNodes>(Timeout);
|
||||||
|
materialise.Variables.Count.ShouldBe(1);
|
||||||
|
materialise.Variables[0].DisplayName.ShouldBe("Model");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Idempotency / the unchanged-plan short-circuit: re-sending the SAME discovered set (the driver
|
||||||
|
/// re-discovers each ~2s pass) is a no-op — it materialises ONCE and does not force the child to
|
||||||
|
/// re-subscribe. A GROWN set, however, DOES re-apply (materialise again + re-subscribe), so a stabilising
|
||||||
|
/// FixedTree still converges.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Repeated_identical_discovery_does_not_reapply_but_a_grown_set_does()
|
||||||
|
{
|
||||||
|
var db = NewInMemoryDbFactory();
|
||||||
|
var factory = new SubscribingDriverFactory("Modbus");
|
||||||
|
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
|
||||||
|
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
|
||||||
|
|
||||||
|
var (actor, publish) = SpawnHostAndApply(db, deploymentId, factory);
|
||||||
|
|
||||||
|
var node1 = new DiscoveredNode(
|
||||||
|
FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" },
|
||||||
|
BrowseName: "Model", DisplayName: "Model", FullReference: "ft-ref-1",
|
||||||
|
DataType: DriverDataType.Float64, IsArray: false, ArrayDim: null,
|
||||||
|
Writable: false, IsHistorized: false);
|
||||||
|
var node2 = new DiscoveredNode(
|
||||||
|
FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Status" },
|
||||||
|
BrowseName: "Run", DisplayName: "Run", FullReference: "ft-ref-2",
|
||||||
|
DataType: DriverDataType.Float64, IsArray: false, ArrayDim: null,
|
||||||
|
Writable: false, IsHistorized: false);
|
||||||
|
|
||||||
|
// Pass 1: a new set ⇒ one materialise + a union re-subscribe.
|
||||||
|
actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", new[] { node1 }));
|
||||||
|
publish.ExpectMsg<OpcUaPublishActor.MaterialiseDiscoveredNodes>(Timeout);
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
var refs = factory.LastSubscribedRefs;
|
||||||
|
refs.ShouldNotBeNull();
|
||||||
|
refs!.ShouldContain("ft-ref-1");
|
||||||
|
}, duration: Timeout);
|
||||||
|
var subscribeCountAfterFirst = factory.SubscribeCount;
|
||||||
|
|
||||||
|
// Pass 2: the IDENTICAL set ⇒ short-circuited (no materialise, no re-subscribe).
|
||||||
|
actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", new[] { node1 }));
|
||||||
|
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||||
|
factory.SubscribeCount.ShouldBe(subscribeCountAfterFirst);
|
||||||
|
|
||||||
|
// Pass 3: a GROWN set (superset) ⇒ re-applies (materialise again + re-subscribe with both refs).
|
||||||
|
actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", new[] { node1, node2 }));
|
||||||
|
publish.ExpectMsg<OpcUaPublishActor.MaterialiseDiscoveredNodes>(Timeout).Variables.Count.ShouldBe(2);
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
factory.SubscribeCount.ShouldBeGreaterThan(subscribeCountAfterFirst);
|
||||||
|
var refs = factory.LastSubscribedRefs;
|
||||||
|
refs.ShouldNotBeNull();
|
||||||
|
refs!.ShouldContain("ft-ref-1");
|
||||||
|
refs.ShouldContain("ft-ref-2");
|
||||||
|
}, duration: Timeout);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Spawns the host with the subscribing driver factory + a publish probe, dispatches the
|
/// <summary>Spawns the host with the subscribing driver factory + a publish probe, dispatches the
|
||||||
/// deployment, and waits for the Applied ACK so the apply (and thus <c>_lastComposition</c> + the live
|
/// deployment, and waits for the Applied ACK so the apply (and thus <c>_lastComposition</c> + the live
|
||||||
/// child + the initial SubscribeBulk pass) has completed before the test injects discovered nodes. A
|
/// child + the initial SubscribeBulk pass) has completed before the test injects discovered nodes. A
|
||||||
@@ -209,6 +329,10 @@ public sealed class DriverHostActorDiscoveryTests : RuntimeActorTestBase
|
|||||||
/// <summary>The reference set passed to the driver's most recent <c>SubscribeAsync</c> call.</summary>
|
/// <summary>The reference set passed to the driver's most recent <c>SubscribeAsync</c> call.</summary>
|
||||||
public IReadOnlyList<string>? LastSubscribedRefs => _driver.LastSubscribedRefs;
|
public IReadOnlyList<string>? LastSubscribedRefs => _driver.LastSubscribedRefs;
|
||||||
|
|
||||||
|
/// <summary>Number of <c>SubscribeAsync</c> calls so far — lets a test prove a redundant re-apply did
|
||||||
|
/// NOT force a (drop-then-)re-subscribe of the whole handle.</summary>
|
||||||
|
public int SubscribeCount => _driver.SubscribeCount;
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public IDriver? TryCreate(string driverType, string driverInstanceId, string driverConfigJson) =>
|
public IDriver? TryCreate(string driverType, string driverInstanceId, string driverConfigJson) =>
|
||||||
string.Equals(driverType, _supportedType, StringComparison.Ordinal) ? _driver : null;
|
string.Equals(driverType, _supportedType, StringComparison.Ordinal) ? _driver : null;
|
||||||
|
|||||||
Reference in New Issue
Block a user