using Akka.Actor; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; /// /// Covers the bounded post-connect re-discovery loop: when an driver /// reaches Connected, runs repeated discovery passes (FOCAS-style: /// the FixedTree is suppressed until the driver's cache populates ~0–2s after connect) and ships each /// pass's captured nodes to its parent as . The /// loop STOPS once the non-empty discovered set stabilises (or the attempt cap is hit) — it must not /// spin forever. A driver that does not implement produces no passes at all. /// [Trait("Category", "Unit")] public sealed class DriverInstanceActorDiscoveryTests : RuntimeActorTestBase { /// /// A discoverable driver whose first two passes yield nothing (cache still warming) and whose third /// pass onward yields a stable 3-node set: the actor ships every pass, then STOPS once the non-empty /// set repeats. The final carries the 3 nodes /// and no further passes arrive — proving the loop is bounded. /// [Fact] public void Discovery_retries_until_set_stabilises_then_stops() { var driver = new DiscoverableStubDriver(); var parent = CreateTestProbe(); // Tiny interval so the bounded retry runs in well under a second (no real-time waits). var actor = parent.ChildActorOf(DriverInstanceActor.Props( driver, rediscoverInterval: TimeSpan.FromMilliseconds(20))); // Drive Connecting → Connected; the Connected entry kicks discovery. actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); // Each discovery pass publishes one DiscoveredNodesReady. The fake stabilises after pass 4 // (passes: 0,0,3,3), so exactly 4 messages arrive, then the stream stops. var msgs = new List(); for (var i = 0; i < 4; i++) msgs.Add(parent.ExpectMsg(TimeSpan.FromSeconds(2))); // The loop must STOP once the non-empty set has stabilised — no fifth pass. parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); // Early passes were empty (FixedTree cache still populating). msgs[0].Nodes.Count.ShouldBe(0); msgs[1].Nodes.Count.ShouldBe(0); // The set then appears and stabilises at 3 nodes. msgs[2].Nodes.Count.ShouldBe(3); var final = msgs[^1]; final.Nodes.Count.ShouldBe(3); final.DriverInstanceId.ShouldBe(driver.DriverInstanceId); final.Nodes.Select(n => n.FullReference).ShouldBe(new[] { "m.fixed.v0", "m.fixed.v1", "m.fixed.v2" }); // The driver was asked exactly as many times as messages published — no extra zombie pass. driver.DiscoverCount.ShouldBe(4); } /// A driver that does not implement produces no discovery passes — /// the Connected entry's discovery kick is a no-op, so the parent receives no /// . [Fact] public void Driver_without_ITagDiscovery_produces_no_discovery() { var driver = new SubscribableStubDriver(); // IDriver + ISubscribable, NOT ITagDiscovery var parent = CreateTestProbe(); var actor = parent.ChildActorOf(DriverInstanceActor.Props( driver, rediscoverInterval: TimeSpan.FromMilliseconds(20))); actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2)); // No discovery capability ⇒ never any DiscoveredNodesReady to the parent. parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); } /// /// Discovery RE-RUNS on every return to Connected: after the initial discovery settles, a /// drives the actor through Reconnecting and /// back to Connected (via the auto-retry timer, the same path the existing reconnect tests use), /// and a fresh bounded discovery loop fires — keeping the injected tree current if the backend's /// capabilities changed across the reconnect. The new init bumps the generation, so any /// pre-reconnect tick is discarded by the generation guard (the initial loop has already settled /// here, so none are in flight). /// [Fact] public void Discovery_reruns_after_reconnect() { var driver = new DiscoverableStubDriver(); var parent = CreateTestProbe(); // Tiny reconnect + rediscover intervals so the whole reconnect-then-rediscover cycle runs fast. var actor = parent.ChildActorOf(DriverInstanceActor.Props( driver, reconnectInterval: TimeSpan.FromMilliseconds(50), rediscoverInterval: TimeSpan.FromMilliseconds(20))); actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); // Drain the initial settling passes (0,0,3,3) and confirm the first loop stopped. for (var i = 0; i < 4; i++) parent.ExpectMsg(TimeSpan.FromSeconds(2)); parent.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); var passesBeforeReconnect = driver.DiscoverCount; // 4 // Force a reconnect: Connected → Reconnecting → (auto retry-connect) → Connected again. actor.Tell(new DriverInstanceActor.ForceReconnect()); // A fresh discovery pass must arrive after the reconnect — the cache is warm now, so it sees // the stable 3-node set immediately. var afterReconnect = parent.ExpectMsg(TimeSpan.FromSeconds(3)); afterReconnect.Nodes.Count.ShouldBe(3); afterReconnect.DriverInstanceId.ShouldBe(driver.DriverInstanceId); // The driver was discovered again — proves a fresh loop ran, not a replay of the old one. driver.DiscoverCount.ShouldBeGreaterThan(passesBeforeReconnect); } /// /// Regression for the Critical: a driver whose DiscoverAsync completes ASYNCHRONOUSLY (off the /// actor thread) must still ship . The handler /// touches Context.Parent + Timers AFTER awaiting discovery; if it awaited with /// ConfigureAwait(false) the continuation would resume off the actor context and those calls /// would throw NotSupportedException("no active ActorContext") — the handler would fault and no /// message would arrive. Synchronous (Task.CompletedTask) stubs mask the bug; this one forces a /// genuine off-context resume (modelled on SubscribableStubDriver.UnsubscribeYields). /// [Fact] public void Async_completing_discovery_resumes_on_actor_context_and_publishes() { var driver = new YieldingDiscoverableStubDriver(); var parent = CreateTestProbe(); var actor = parent.ChildActorOf(DriverInstanceActor.Props( driver, rediscoverInterval: TimeSpan.FromMilliseconds(20))); actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); // With the fix the handler resumes on the actor context, so the publish succeeds and the parent gets // a non-empty set. Without it the handler faults at Context.Parent.Tell and this times out. var published = parent.ExpectMsg(TimeSpan.FromSeconds(2)); published.Nodes.Count.ShouldBe(3); published.DriverInstanceId.ShouldBe(driver.DriverInstanceId); } /// /// The attempt cap bounds a discovered set that never stabilises: a driver whose set keeps GROWING /// (1,2,3,…) never repeats its signature, so the loop is stopped only by /// rediscoverMaxAttempts. With a cap of 3, exactly 3 passes are published, then the stream stops. /// [Fact] public void Never_stabilising_discovery_is_bounded_by_the_attempt_cap() { var driver = new GrowingDiscoverableStubDriver(); var parent = CreateTestProbe(); var actor = parent.ChildActorOf(DriverInstanceActor.Props( driver, rediscoverInterval: TimeSpan.FromMilliseconds(20), rediscoverMaxAttempts: 3)); actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); var msgs = new List(); for (var i = 0; i < 3; i++) msgs.Add(parent.ExpectMsg(TimeSpan.FromSeconds(2))); // Cap reached — no fourth pass even though the set never stabilised. parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); // The set genuinely kept growing across the capped passes (1,2,3 nodes). msgs.Select(m => m.Nodes.Count).ShouldBe(new[] { 1, 2, 3 }); driver.DiscoverCount.ShouldBe(3); } /// /// A driver whose is /// opts out of post-connect discovery entirely: the /// Connected entry's discovery kick returns before scheduling the first tick, so the driver is never /// asked to discover and the parent receives no . /// [Fact] public void Discovery_policy_Never_runs_no_passes_and_publishes_nothing() { var driver = new DiscoverableStubDriver(DiscoveryRediscoverPolicy.Never); var parent = CreateTestProbe(); var actor = parent.ChildActorOf(DriverInstanceActor.Props( driver, rediscoverInterval: TimeSpan.FromMilliseconds(20))); actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); // Connect happened (the discovery decision is made on the Connected entry)... AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2)); // ...but policy=Never ⇒ no discovery pass is ever run and nothing is published. parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); driver.DiscoverCount.ShouldBe(0); } /// /// A driver whose is /// runs EXACTLY one post-connect pass even when its /// discovered set would keep growing forever — under UntilStable the never-repeating signature /// would retry to the attempt cap. Exactly one /// is published and no further RediscoverTick is scheduled. /// [Fact] public void Discovery_policy_Once_publishes_exactly_one_pass_even_when_set_keeps_growing() { var driver = new GrowingDiscoverableStubDriver(DiscoveryRediscoverPolicy.Once); var parent = CreateTestProbe(); var actor = parent.ChildActorOf(DriverInstanceActor.Props( driver, rediscoverInterval: TimeSpan.FromMilliseconds(20))); actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); // Exactly one pass is published (the first, growing set → 1 node)... var only = parent.ExpectMsg(TimeSpan.FromSeconds(2)); only.Nodes.Count.ShouldBe(1); only.DriverInstanceId.ShouldBe(driver.DriverInstanceId); // ...and NO second tick is scheduled, even though the set would keep growing under UntilStable. parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); driver.DiscoverCount.ShouldBe(1); } /// /// The per-pass discovery timeout is injectable via so tests /// can control it without real-time delays. The default constant must be 30 seconds (behaviour-preserving). /// Wiring is verified by constructing via Props with a custom value and confirming the actor starts /// and begins discovery normally. /// [Fact] public void Discovery_timeout_default_constant_is_30s_and_Props_accepts_custom_value() { // The constant must exist and preserve the pre-refactor 30 s literal. DriverInstanceActor.DefaultRediscoverDiscoverTimeout.ShouldBe(TimeSpan.FromSeconds(30)); // Props must accept the new optional parameter — no throw and actor starts normally. var driver = new DiscoverableStubDriver(); var parent = CreateTestProbe(); var actor = parent.ChildActorOf(DriverInstanceActor.Props( driver, rediscoverInterval: TimeSpan.FromMilliseconds(20), rediscoverDiscoverTimeout: TimeSpan.FromSeconds(5))); actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); // Actor starts and discovery publishes — confirms the custom timeout was wired without error. parent.ExpectMsg(TimeSpan.FromSeconds(2)); } /// /// A that also exposes . Each DiscoverAsync /// pass is counted; passes 1–2 yield nothing (cache warming), passes 3+ yield a stable 3-node set — /// modelling FOCAS, whose FixedTree appears once a few seconds after connect and then stays put. /// private sealed class DiscoverableStubDriver : StubDriver, ITagDiscovery { private int _passCount; /// Constructs the fake reporting the given ; /// defaults to (the interface default) so the /// existing UntilStable tests are unaffected. public DiscoverableStubDriver(DiscoveryRediscoverPolicy policy = DiscoveryRediscoverPolicy.UntilStable) => RediscoverPolicy = policy; /// The post-connect re-discovery policy this fake reports to the actor. public DiscoveryRediscoverPolicy RediscoverPolicy { get; } /// Number of passes the actor has driven. public int DiscoverCount => Volatile.Read(ref _passCount); /// Streams a growing-then-stable node set into the builder (0,0,3,3,…). public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) { var pass = Interlocked.Increment(ref _passCount); // 1-based pass number var count = pass >= 3 ? 3 : 0; var fixedTree = builder.Folder("FixedTree", "FixedTree"); for (var i = 0; i < count; i++) { fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo( FullName: $"m.fixed.v{i}", DriverDataType: DriverDataType.Float64, IsArray: false, ArrayDim: null, SecurityClass: SecurityClassification.ViewOnly, IsHistorized: false)); } return Task.CompletedTask; } } /// /// A discoverable driver whose DiscoverAsync genuinely SUSPENDS and resumes on a fresh /// thread-pool thread that carries NO Akka actor cell — modelled on /// SubscribableStubDriver.UnsubscribeYields. This forces the actor's await DiscoverAsync(...) /// continuation to resume off-context unless the handler omits ConfigureAwait(false), so it is a /// deterministic repro of the no-ActorContext race. Returns a stable 3-node set on every pass. /// private sealed class YieldingDiscoverableStubDriver : StubDriver, ITagDiscovery { /// Suspends on a TCS completed from a background thread, then streams 3 nodes. public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _ = Task.Run(() => tcs.SetResult(), cancellationToken); await tcs.Task.ConfigureAwait(false); // resume on a clean thread-pool thread (no actor cell) var fixedTree = builder.Folder("FixedTree", "FixedTree"); for (var i = 0; i < 3; i++) { fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo( FullName: $"m.fixed.v{i}", DriverDataType: DriverDataType.Float64, IsArray: false, ArrayDim: null, SecurityClass: SecurityClassification.ViewOnly, IsHistorized: false)); } } } /// /// A discoverable driver whose set NEVER stabilises: pass N yields N nodes (1,2,3,…), so the /// full-reference signature differs every pass and the loop can only be bounded by the attempt cap. /// private sealed class GrowingDiscoverableStubDriver : StubDriver, ITagDiscovery { private int _passCount; /// Constructs the fake reporting the given ; /// defaults to (the interface default) so the /// existing attempt-cap test is unaffected. With the /// ever-growing set proves the actor stops after a single pass (UntilStable would keep retrying). public GrowingDiscoverableStubDriver(DiscoveryRediscoverPolicy policy = DiscoveryRediscoverPolicy.UntilStable) => RediscoverPolicy = policy; /// The post-connect re-discovery policy this fake reports to the actor. public DiscoveryRediscoverPolicy RediscoverPolicy { get; } /// Number of passes the actor has driven. public int DiscoverCount => Volatile.Read(ref _passCount); /// Streams an ever-growing node set (pass N → N nodes). public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) { var pass = Interlocked.Increment(ref _passCount); // 1-based pass number var fixedTree = builder.Folder("FixedTree", "FixedTree"); for (var i = 0; i < pass; i++) { fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo( FullName: $"m.fixed.v{i}", DriverDataType: DriverDataType.Float64, IsArray: false, ArrayDim: null, SecurityClass: SecurityClassification.ViewOnly, IsHistorized: false)); } return Task.CompletedTask; } } }