refactor(otopcua): extract authored-only send helper + empty-authored dropped-path test (follow-up D)
This commit is contained in:
@@ -1250,16 +1250,26 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
// authored values once per cached driver per redeploy. Net effect: exactly ONE send per driver per pass.
|
// authored values once per cached driver per redeploy. Net effect: exactly ONE send per driver per pass.
|
||||||
var cachedDriverIds = _discoveredByDriver.Keys.ToHashSet(StringComparer.Ordinal);
|
var cachedDriverIds = _discoveredByDriver.Keys.ToHashSet(StringComparer.Ordinal);
|
||||||
|
|
||||||
|
// One authored-only push (value refs + alarm refs from the maps built above), shared by the bulk loop AND
|
||||||
|
// the dropped-driver fallback so the two CANNOT drift: the fallback's correctness depends on sending the
|
||||||
|
// SAME payload the bulk loop would have, so it's a shared helper (structural), not a comment-maintained
|
||||||
|
// invariant. An EMPTY set is valid — the child's Connected handler routes it to Unsubscribe (dropping a
|
||||||
|
// stale handle) rather than a spurious subscribe. Returns the value-ref count for the bulk-loop log total.
|
||||||
|
int SendAuthoredOnly(IActorRef actor, string driverId)
|
||||||
|
{
|
||||||
|
var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty<string>();
|
||||||
|
var alarmRefs = alarmRefsByDriver.TryGetValue(driverId, out var ar) ? ar : Array.Empty<string>();
|
||||||
|
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval, alarmRefs));
|
||||||
|
return refs.Count;
|
||||||
|
}
|
||||||
|
|
||||||
var total = 0;
|
var total = 0;
|
||||||
foreach (var (driverId, entry) in _children)
|
foreach (var (driverId, entry) in _children)
|
||||||
{
|
{
|
||||||
// Cached drivers are owned exclusively by the re-inject tail (one send each) — skip here. Non-cached
|
// Cached drivers are owned exclusively by the re-inject tail (one send each) — skip here. Non-cached
|
||||||
// drivers keep the bulk authored-only send exactly as before.
|
// drivers keep the bulk authored-only send exactly as before.
|
||||||
if (cachedDriverIds.Contains(driverId)) continue;
|
if (cachedDriverIds.Contains(driverId)) continue;
|
||||||
var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty<string>();
|
total += SendAuthoredOnly(entry.Actor, driverId);
|
||||||
var alarmRefs = alarmRefsByDriver.TryGetValue(driverId, out var ar) ? ar : Array.Empty<string>();
|
|
||||||
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval, alarmRefs));
|
|
||||||
total += refs.Count;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (total > 0)
|
if (total > 0)
|
||||||
@@ -1371,13 +1381,11 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
// set NOW (the SAME payload the bulk loop computes), so the authored tags subscribe in THIS pass.
|
// set NOW (the SAME payload the bulk loop computes), so the authored tags subscribe in THIS pass.
|
||||||
// (The TriggerRediscovery above handles the async FixedTree re-graft separately; this just keeps
|
// (The TriggerRediscovery above handles the async FixedTree re-graft separately; this just keeps
|
||||||
// the authored values live meanwhile.) Guarded on the child still existing — a driver removed by
|
// the authored values live meanwhile.) Guarded on the child still existing — a driver removed by
|
||||||
// ReconcileDrivers has no child and correctly gets no send.
|
// ReconcileDrivers has no child and correctly gets no send. Shares SendAuthoredOnly with the bulk
|
||||||
|
// loop so the payload can't drift; a ZERO-authored driver sends an empty set → Unsubscribe (drops
|
||||||
|
// the stale FixedTree handle without a spurious subscribe).
|
||||||
if (_children.TryGetValue(driverId, out var fallbackEntry))
|
if (_children.TryGetValue(driverId, out var fallbackEntry))
|
||||||
{
|
SendAuthoredOnly(fallbackEntry.Actor, driverId);
|
||||||
var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty<string>();
|
|
||||||
var alarmRefs = alarmRefsByDriver.TryGetValue(driverId, out var ar) ? ar : Array.Empty<string>();
|
|
||||||
fallbackEntry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval, alarmRefs));
|
|
||||||
}
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ApplyDiscoveredPlansForDriver(driverId, plansByEquipment);
|
ApplyDiscoveredPlansForDriver(driverId, plansByEquipment);
|
||||||
|
|||||||
@@ -773,6 +773,59 @@ public sealed class DriverHostActorDiscoveryTests : RuntimeActorTestBase
|
|||||||
factory.SubscribeCount.ShouldBe(countBeforeRedeploy + 1);
|
factory.SubscribeCount.ShouldBe(countBeforeRedeploy + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Follow-up D (one-send invariant — EMPTY-authored DROPPED path, a capability THIS branch newly
|
||||||
|
/// enabled): a CACHED driver with ZERO authored tags (bound to its equipment only via
|
||||||
|
/// <see cref="EquipmentNode.DriverInstanceId"/>) whose FixedTree plan is FULLY DROPPED by a rebind redeploy
|
||||||
|
/// receives an EMPTY authored-only fallback <see cref="DriverInstanceActor.SetDesiredSubscriptions"/>, which
|
||||||
|
/// the Connected handler routes to <c>Unsubscribe</c> (dropping the stale FixedTree handle) — NOT a subscribe.
|
||||||
|
/// Proven by <c>SubscribeCount</c> staying FLAT across the redeploy (no spurious subscribe), closing the
|
||||||
|
/// SubscribeCount-proxy blind spot for the empty-set fallback.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Cached_tag_less_driver_fully_dropped_redeploy_sends_empty_fallback_without_subscribing()
|
||||||
|
{
|
||||||
|
var db = NewInMemoryDbFactory();
|
||||||
|
var factory = new SubscribingDriverFactory("Modbus");
|
||||||
|
// d1 bound to EQ-1 via EquipmentNode.DriverInstanceId, with NO authored tags.
|
||||||
|
var deploymentId = SeedDeploymentWithTagLessEquipment(db, RevA, equipmentId: "EQ-1", driverId: "d1");
|
||||||
|
|
||||||
|
var (actor, publish, coordinator) = SpawnHostAndApply(db, deploymentId, factory);
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", new[]
|
||||||
|
{
|
||||||
|
new DiscoveredNode(
|
||||||
|
FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" },
|
||||||
|
BrowseName: "Model", DisplayName: "Model", FullReference: "ft-ref-1",
|
||||||
|
DataType: DriverDataType.Float64, IsArray: false, ArrayDim: null,
|
||||||
|
Writable: false, IsHistorized: false),
|
||||||
|
}));
|
||||||
|
|
||||||
|
// First injection: the discovered FixedTree materialises under EQ-1 and the child subscribes the
|
||||||
|
// discovered-only set (no authored ref to union) — this populates _discoveredByDriver[d1] = { EQ-1: plan }.
|
||||||
|
publish.ExpectMsg<OpcUaPublishActor.MaterialiseDiscoveredNodes>(Timeout);
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
var refs = factory.LastSubscribedRefs;
|
||||||
|
refs.ShouldNotBeNull();
|
||||||
|
refs!.ShouldContain("ft-ref-1");
|
||||||
|
}, duration: Timeout);
|
||||||
|
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||||
|
var countBeforeRedeploy = factory.SubscribeCount;
|
||||||
|
|
||||||
|
// Redeploy REBINDING the tag-less d1 EQ-1 → EQ-2 (still tag-less; DriverConfig "{}" unchanged ⇒ child NOT
|
||||||
|
// restarted). The cached EQ-1 plan is no longer a candidate ⇒ dropped ⇒ inner map empties ⇒ d1 removed
|
||||||
|
// from _discoveredByDriver. With ZERO authored tags the fallback set is EMPTY ⇒ the child Unsubscribes.
|
||||||
|
var deploymentId2 = SeedDeploymentWithTagLessEquipment(db, RevB, equipmentId: "EQ-2", driverId: "d1");
|
||||||
|
actor.Tell(new DispatchDeployment(deploymentId2, RevB, CorrelationId.NewId()));
|
||||||
|
coordinator.ExpectMsg<ApplyAck>(Timeout).Outcome.ShouldBe(ApplyAckOutcome.Applied);
|
||||||
|
publish.ExpectMsg<OpcUaPublishActor.RebuildAddressSpace>(Timeout);
|
||||||
|
// No re-materialise — the cached plan was dropped (not re-grafted).
|
||||||
|
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||||
|
|
||||||
|
// The empty fallback routes to Unsubscribe, NOT SubscribeAsync ⇒ the count does NOT rise. (A non-empty or
|
||||||
|
// spurious subscribe — the bug this guards against — would increment it.)
|
||||||
|
factory.SubscribeCount.ShouldBe(countBeforeRedeploy);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Spawns the host with the subscribing driver factory + a publish probe, dispatches the
|
/// <summary>Spawns the host with the subscribing driver factory + a publish probe, dispatches the
|
||||||
/// deployment, and waits for the Applied ACK so the apply (and thus <c>_lastComposition</c> + the live
|
/// deployment, and waits for the Applied ACK so the apply (and thus <c>_lastComposition</c> + the live
|
||||||
/// child + the initial SubscribeBulk pass) has completed before the test injects discovered nodes. A
|
/// child + the initial SubscribeBulk pass) has completed before the test injects discovered nodes. A
|
||||||
|
|||||||
Reference in New Issue
Block a user