From 05c820795ab5332d85dafcc203e7b040d6c51aa4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 26 Jun 2026 14:30:16 -0400 Subject: [PATCH] perf(otopcua): one SetDesiredSubscriptions per driver per redeploy (follow-up D) --- .../Drivers/DriverHostActor.cs | 25 +++ .../Drivers/DriverHostActorDiscoveryTests.cs | 178 ++++++++++++++++++ 2 files changed, 203 insertions(+) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index 8945623e..127b8a33 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -1241,9 +1241,21 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers _driverRefByNodeId[nodeId] = key; } + // Snapshot the cached (FixedTree-discovered) driver set BEFORE the bulk loop, while _discoveredByDriver + // is still untouched (the re-inject tail below drops/removes entries). Cached drivers are SKIPPED in the + // bulk loop because the tail sends each of them EXACTLY ONE SetDesiredSubscriptions for this pass: the + // authored∪discovered union (ApplyDiscoveredPlansForDriver) for a survivor, or — if its plan is fully + // dropped — an authored-only fallback. Sending the bulk authored-only set HERE too would force the child + // to drop the whole handle (authored tags included) then re-subscribe — an extra unsub/resub blip of the + // authored values once per cached driver per redeploy. Net effect: exactly ONE send per driver per pass. + var cachedDriverIds = _discoveredByDriver.Keys.ToHashSet(StringComparer.Ordinal); + var total = 0; foreach (var (driverId, entry) in _children) { + // Cached drivers are owned exclusively by the re-inject tail (one send each) — skip here. Non-cached + // drivers keep the bulk authored-only send exactly as before. + if (cachedDriverIds.Contains(driverId)) continue; var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty(); var alarmRefs = alarmRefsByDriver.TryGetValue(driverId, out var ar) ? ar : Array.Empty(); entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval, alarmRefs)); @@ -1353,6 +1365,19 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers if (plansByEquipment.Count == 0) { _discoveredByDriver.Remove(driverId); + // FALLBACK (one-send invariant): this driver was SKIPPED in the bulk loop (it was cached), and its + // plan is now FULLY DROPPED — so ApplyDiscoveredPlansForDriver won't run for it and it would + // otherwise receive ZERO sends this pass, losing its AUTHORED subscriptions. Send the authored-only + // set NOW (the SAME payload the bulk loop computes), so the authored tags subscribe in THIS pass. + // (The TriggerRediscovery above handles the async FixedTree re-graft separately; this just keeps + // the authored values live meanwhile.) Guarded on the child still existing — a driver removed by + // ReconcileDrivers has no child and correctly gets no send. + if (_children.TryGetValue(driverId, out var fallbackEntry)) + { + var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty(); + var alarmRefs = alarmRefsByDriver.TryGetValue(driverId, out var ar) ? ar : Array.Empty(); + fallbackEntry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval, alarmRefs)); + } continue; } ApplyDiscoveredPlansForDriver(driverId, plansByEquipment); diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorDiscoveryTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorDiscoveryTests.cs index c96c19c4..9da3a4b8 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorDiscoveryTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorDiscoveryTests.cs @@ -595,6 +595,184 @@ public sealed class DriverHostActorDiscoveryTests : RuntimeActorTestBase factory.DiscoverCount.ShouldBe(1); } + /// Follow-up D (one-send invariant — SURVIVOR path): a CACHED (FixedTree-discovered) driver whose + /// plan SURVIVES a redeploy must receive EXACTLY ONE + /// for that pass — the authored∪discovered UNION the re-inject tail sends — NOT the old TWO (a bulk + /// authored-only send that dropped the whole handle, then the tail union that re-subscribed it). Observed via + /// the shared driver's SubscribeCount (each non-empty SetDesiredSubscriptions ⇒ exactly one + /// SubscribeAsync — no de-dup in ): the count rises by EXACTLY 1 across the + /// redeploy and the final subscribed set is the union. (Pre-task: the bulk loop ALSO sent the authored-only + /// set first ⇒ the count rose by 2 and the set transiently dropped "ft-ref-1".) + [Fact] + public void Cached_driver_survivor_redeploy_sends_exactly_one_union_subscription() + { + var db = NewInMemoryDbFactory(); + var factory = new SubscribingDriverFactory("Modbus"); + var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA, + (Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed")); + + var (actor, publish, coordinator) = SpawnHostAndApply(db, deploymentId, factory); + + actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", new[] + { + new DiscoveredNode( + FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" }, + BrowseName: "Model", DisplayName: "Model", FullReference: "ft-ref-1", + DataType: DriverDataType.Float64, IsArray: false, ArrayDim: null, + Writable: false, IsHistorized: false), + })); + + // First injection: the union subscribe (authored "40001" + discovered "ft-ref-1") lands and the cache is + // populated (_discoveredByDriver[d1] = { EQ-1: plan }). + publish.ExpectMsg(Timeout); + AwaitAssert(() => + { + var refs = factory.LastSubscribedRefs; + refs.ShouldNotBeNull(); + refs!.ShouldContain("40001"); + refs.ShouldContain("ft-ref-1"); + }, duration: Timeout); + // Let the first-injection traffic settle, then snapshot the subscribe count as the redeploy baseline. + publish.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + var countBeforeRedeploy = factory.SubscribeCount; + + // Redeploy the SAME composition (new revision so it applies; d1 → EQ-1 unchanged ⇒ the cached plan + // SURVIVES the re-inject tail ⇒ re-applied as a single union send). + var deploymentId2 = SeedDeploymentWithEquipmentTags(db, RevB, + (Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed")); + actor.Tell(new DispatchDeployment(deploymentId2, RevB, CorrelationId.NewId())); + coordinator.ExpectMsg(Timeout).Outcome.ShouldBe(ApplyAckOutcome.Applied); + publish.ExpectMsg(Timeout); + publish.ExpectMsg(Timeout); // tail survivor re-materialise + + // EXACTLY ONE SetDesiredSubscriptions this redeploy: the count rises by 1 and the set is the union (the + // combined condition is UNSATISFIABLE under the old double-send — at count+1 the set was authored-only + // (no "ft-ref-1"), at count+2 the count overshoots — so this fails RED before the fix). + AwaitAssert(() => + { + var refs = factory.LastSubscribedRefs; + refs.ShouldNotBeNull(); + refs!.ShouldContain("40001"); + refs.ShouldContain("ft-ref-1"); + factory.SubscribeCount.ShouldBe(countBeforeRedeploy + 1); + }, duration: Timeout); + // Settle + re-confirm the count did NOT creep to +2 (the retired bulk authored-only + tail union double-send). + publish.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + factory.SubscribeCount.ShouldBe(countBeforeRedeploy + 1); + } + + /// Follow-up D (one-send invariant — DROPPED path): a CACHED driver whose plan is FULLY DROPPED by a + /// config-unchanged rebind (the inner map empties ⇒ the driver is removed from _discoveredByDriver) + /// must still receive EXACTLY ONE — the AUTHORED-ONLY + /// fallback — so its authored subscriptions are not lost now that the bulk loop SKIPS cached drivers. The + /// re-inject tail no longer re-applies a (now-empty) plan for it, so the fallback is the only send. Observed + /// via SubscribeCount (+1) and the subscribed set ("40001" only, NOT the dropped "ft-ref-1"). (It also + /// gets a — a different message type the non-discovery + /// child no-ops, so it adds no subscribe.) Guards that the bulk-skip didn't reduce this path to ZERO sends. + [Fact] + public void Cached_driver_fully_dropped_redeploy_sends_exactly_one_authored_only_fallback() + { + var db = NewInMemoryDbFactory(); + var factory = new SubscribingDriverFactory("Modbus"); + var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA, + (Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed")); + + var (actor, publish, coordinator) = SpawnHostAndApply(db, deploymentId, factory); + + actor.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", new[] + { + new DiscoveredNode( + FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" }, + BrowseName: "Model", DisplayName: "Model", FullReference: "ft-ref-1", + DataType: DriverDataType.Float64, IsArray: false, ArrayDim: null, + Writable: false, IsHistorized: false), + })); + + publish.ExpectMsg(Timeout); + AwaitAssert(() => + { + var refs = factory.LastSubscribedRefs; + refs.ShouldNotBeNull(); + refs!.ShouldContain("40001"); + refs.ShouldContain("ft-ref-1"); + }, duration: Timeout); + publish.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + var countBeforeRedeploy = factory.SubscribeCount; + + // Redeploy REBINDING d1 EQ-1 → EQ-2 (same FullName; DriverConfig "{}" unchanged ⇒ child NOT restarted). + // The cached EQ-1-scoped plan is dropped by the rebind guard ⇒ the inner map empties ⇒ d1 is removed from + // _discoveredByDriver ⇒ NO survivor re-apply. The fallback must send the authored-only set so "40001" + // stays subscribed this pass. + var deploymentId2 = SeedDeploymentWithEquipmentTags(db, RevB, + (Equip: "EQ-2", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed")); + actor.Tell(new DispatchDeployment(deploymentId2, RevB, CorrelationId.NewId())); + coordinator.ExpectMsg(Timeout).Outcome.ShouldBe(ApplyAckOutcome.Applied); + publish.ExpectMsg(Timeout); + // No MaterialiseDiscoveredNodes — the plan was dropped, not re-grafted — so no further publish traffic. + publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + + // EXACTLY ONE SetDesiredSubscriptions this redeploy: the authored-only fallback. The count rises by 1 and + // the set is "40001" only (the dropped FixedTree ref is gone). + AwaitAssert(() => + { + var refs = factory.LastSubscribedRefs; + refs.ShouldNotBeNull(); + refs!.ShouldContain("40001"); + refs.ShouldNotContain("ft-ref-1"); + refs.Count.ShouldBe(1); + factory.SubscribeCount.ShouldBe(countBeforeRedeploy + 1); + }, duration: Timeout); + publish.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + factory.SubscribeCount.ShouldBe(countBeforeRedeploy + 1); + } + + /// Follow-up D (one-send invariant — NON-CACHED path): a driver that was NEVER cached (no FixedTree + /// discovered) is unaffected by the cached-driver bulk-loop skip — it still gets EXACTLY ONE bulk + /// authored-only per redeploy (the re-inject tail + /// never runs for it). Guards that the skip didn't accidentally suppress (or double) a non-cached driver's + /// send. Observed via SubscribeCount (+1) and the subscribed set ("40001" only). + [Fact] + public void Non_cached_driver_redeploy_sends_exactly_one_authored_only_subscription() + { + var db = NewInMemoryDbFactory(); + var factory = new SubscribingDriverFactory("Modbus"); + var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA, + (Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed")); + + var (actor, publish, coordinator) = SpawnHostAndApply(db, deploymentId, factory); + + // No DiscoveredNodesReady ⇒ d1 is never cached. Wait for the initial bulk subscribe to settle, then + // snapshot the count as the redeploy baseline. + AwaitAssert(() => + { + var refs = factory.LastSubscribedRefs; + refs.ShouldNotBeNull(); + refs!.ShouldContain("40001"); + }, duration: Timeout); + publish.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + var countBeforeRedeploy = factory.SubscribeCount; + + // Redeploy SAME composition (new rev). d1 is NOT in _discoveredByDriver ⇒ the bulk loop sends it once and + // the re-inject tail skips it. + var deploymentId2 = SeedDeploymentWithEquipmentTags(db, RevB, + (Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed")); + actor.Tell(new DispatchDeployment(deploymentId2, RevB, CorrelationId.NewId())); + coordinator.ExpectMsg(Timeout).Outcome.ShouldBe(ApplyAckOutcome.Applied); + publish.ExpectMsg(Timeout); + publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); // no materialise — d1 was never cached + + AwaitAssert(() => + { + var refs = factory.LastSubscribedRefs; + refs.ShouldNotBeNull(); + refs!.ShouldContain("40001"); + refs.Count.ShouldBe(1); + factory.SubscribeCount.ShouldBe(countBeforeRedeploy + 1); + }, duration: Timeout); + publish.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + factory.SubscribeCount.ShouldBe(countBeforeRedeploy + 1); + } + /// Spawns the host with the subscribing driver factory + a publish probe, dispatches the /// deployment, and waits for the Applied ACK so the apply (and thus _lastComposition + the live /// child + the initial SubscribeBulk pass) has completed before the test injects discovered nodes. A