using Akka.Actor;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
///
/// Covers the bounded post-connect re-discovery loop: when an driver
/// reaches Connected, runs repeated discovery passes (FOCAS-style:
/// the FixedTree is suppressed until the driver's cache populates ~0–2s after connect) and ships each
/// pass's captured nodes to its parent as . The
/// loop STOPS once the non-empty discovered set stabilises (or the attempt cap is hit) — it must not
/// spin forever. A driver that does not implement produces no passes at all.
///
[Trait("Category", "Unit")]
public sealed class DriverInstanceActorDiscoveryTests : RuntimeActorTestBase
{
///
/// A discoverable driver whose first two passes yield nothing (cache still warming) and whose third
/// pass onward yields a stable 3-node set: the actor ships every pass, then STOPS once the non-empty
/// set repeats. The final carries the 3 nodes
/// and no further passes arrive — proving the loop is bounded.
///
[Fact]
public void Discovery_retries_until_set_stabilises_then_stops()
{
var driver = new DiscoverableStubDriver();
var parent = CreateTestProbe();
// Tiny interval so the bounded retry runs in well under a second (no real-time waits).
var actor = parent.ChildActorOf(DriverInstanceActor.Props(
driver, rediscoverInterval: TimeSpan.FromMilliseconds(20)));
// Drive Connecting → Connected; the Connected entry kicks discovery.
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
// Each discovery pass publishes one DiscoveredNodesReady. The fake stabilises after pass 4
// (passes: 0,0,3,3), so exactly 4 messages arrive, then the stream stops.
var msgs = new List();
for (var i = 0; i < 4; i++)
msgs.Add(parent.ExpectMsg(TimeSpan.FromSeconds(2)));
// The loop must STOP once the non-empty set has stabilised — no fifth pass.
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
// Early passes were empty (FixedTree cache still populating).
msgs[0].Nodes.Count.ShouldBe(0);
msgs[1].Nodes.Count.ShouldBe(0);
// The set then appears and stabilises at 3 nodes.
msgs[2].Nodes.Count.ShouldBe(3);
var final = msgs[^1];
final.Nodes.Count.ShouldBe(3);
final.DriverInstanceId.ShouldBe(driver.DriverInstanceId);
final.Nodes.Select(n => n.FullReference).ShouldBe(new[] { "m.fixed.v0", "m.fixed.v1", "m.fixed.v2" });
// The driver was asked exactly as many times as messages published — no extra zombie pass.
driver.DiscoverCount.ShouldBe(4);
}
/// A driver that does not implement produces no discovery passes —
/// the Connected entry's discovery kick is a no-op, so the parent receives no
/// .
[Fact]
public void Driver_without_ITagDiscovery_produces_no_discovery()
{
var driver = new SubscribableStubDriver(); // IDriver + ISubscribable, NOT ITagDiscovery
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(
driver, rediscoverInterval: TimeSpan.FromMilliseconds(20)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
// No discovery capability ⇒ never any DiscoveredNodesReady to the parent.
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
}
///
/// Discovery RE-RUNS on every return to Connected: after the initial discovery settles, a
/// drives the actor through Reconnecting and
/// back to Connected (via the auto-retry timer, the same path the existing reconnect tests use),
/// and a fresh bounded discovery loop fires — keeping the injected tree current if the backend's
/// capabilities changed across the reconnect. The new init bumps the generation, so any
/// pre-reconnect tick is discarded by the generation guard (the initial loop has already settled
/// here, so none are in flight).
///
[Fact]
public void Discovery_reruns_after_reconnect()
{
var driver = new DiscoverableStubDriver();
var parent = CreateTestProbe();
// Tiny reconnect + rediscover intervals so the whole reconnect-then-rediscover cycle runs fast.
var actor = parent.ChildActorOf(DriverInstanceActor.Props(
driver,
reconnectInterval: TimeSpan.FromMilliseconds(50),
rediscoverInterval: TimeSpan.FromMilliseconds(20)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
// Drain the initial settling passes (0,0,3,3) and confirm the first loop stopped.
for (var i = 0; i < 4; i++)
parent.ExpectMsg(TimeSpan.FromSeconds(2));
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
var passesBeforeReconnect = driver.DiscoverCount; // 4
// Force a reconnect: Connected → Reconnecting → (auto retry-connect) → Connected again.
actor.Tell(new DriverInstanceActor.ForceReconnect());
// A fresh discovery pass must arrive after the reconnect — the cache is warm now, so it sees
// the stable 3-node set immediately.
var afterReconnect = parent.ExpectMsg(TimeSpan.FromSeconds(3));
afterReconnect.Nodes.Count.ShouldBe(3);
afterReconnect.DriverInstanceId.ShouldBe(driver.DriverInstanceId);
// The driver was discovered again — proves a fresh loop ran, not a replay of the old one.
driver.DiscoverCount.ShouldBeGreaterThan(passesBeforeReconnect);
}
///
/// Regression for the Critical: a driver whose DiscoverAsync completes ASYNCHRONOUSLY (off the
/// actor thread) must still ship . The handler
/// touches Context.Parent + Timers AFTER awaiting discovery; if it awaited with
/// ConfigureAwait(false) the continuation would resume off the actor context and those calls
/// would throw NotSupportedException("no active ActorContext") — the handler would fault and no
/// message would arrive. Synchronous (Task.CompletedTask) stubs mask the bug; this one forces a
/// genuine off-context resume (modelled on SubscribableStubDriver.UnsubscribeYields).
///
[Fact]
public void Async_completing_discovery_resumes_on_actor_context_and_publishes()
{
var driver = new YieldingDiscoverableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(
driver, rediscoverInterval: TimeSpan.FromMilliseconds(20)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
// With the fix the handler resumes on the actor context, so the publish succeeds and the parent gets
// a non-empty set. Without it the handler faults at Context.Parent.Tell and this times out.
var published = parent.ExpectMsg(TimeSpan.FromSeconds(2));
published.Nodes.Count.ShouldBe(3);
published.DriverInstanceId.ShouldBe(driver.DriverInstanceId);
}
///
/// The attempt cap bounds a discovered set that never stabilises: a driver whose set keeps GROWING
/// (1,2,3,…) never repeats its signature, so the loop is stopped only by
/// rediscoverMaxAttempts. With a cap of 3, exactly 3 passes are published, then the stream stops.
///
[Fact]
public void Never_stabilising_discovery_is_bounded_by_the_attempt_cap()
{
var driver = new GrowingDiscoverableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(
driver, rediscoverInterval: TimeSpan.FromMilliseconds(20), rediscoverMaxAttempts: 3));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
var msgs = new List();
for (var i = 0; i < 3; i++)
msgs.Add(parent.ExpectMsg(TimeSpan.FromSeconds(2)));
// Cap reached — no fourth pass even though the set never stabilised.
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
// The set genuinely kept growing across the capped passes (1,2,3 nodes).
msgs.Select(m => m.Nodes.Count).ShouldBe(new[] { 1, 2, 3 });
driver.DiscoverCount.ShouldBe(3);
}
///
/// The per-pass discovery timeout is injectable via so tests
/// can control it without real-time delays. The default constant must be 30 seconds (behaviour-preserving).
/// Wiring is verified by constructing via Props with a custom value and confirming the actor starts
/// and begins discovery normally.
///
[Fact]
public void Discovery_timeout_default_constant_is_30s_and_Props_accepts_custom_value()
{
// The constant must exist and preserve the pre-refactor 30 s literal.
DriverInstanceActor.DefaultRediscoverDiscoverTimeout.ShouldBe(TimeSpan.FromSeconds(30));
// Props must accept the new optional parameter — no throw and actor starts normally.
var driver = new DiscoverableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(
driver,
rediscoverInterval: TimeSpan.FromMilliseconds(20),
rediscoverDiscoverTimeout: TimeSpan.FromSeconds(5)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
// Actor starts and discovery publishes — confirms the custom timeout was wired without error.
parent.ExpectMsg(TimeSpan.FromSeconds(2));
}
///
/// A that also exposes . Each DiscoverAsync
/// pass is counted; passes 1–2 yield nothing (cache warming), passes 3+ yield a stable 3-node set —
/// modelling FOCAS, whose FixedTree appears once a few seconds after connect and then stays put.
///
private sealed class DiscoverableStubDriver : StubDriver, ITagDiscovery
{
private int _passCount;
/// Number of passes the actor has driven.
public int DiscoverCount => Volatile.Read(ref _passCount);
/// Streams a growing-then-stable node set into the builder (0,0,3,3,…).
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
var pass = Interlocked.Increment(ref _passCount); // 1-based pass number
var count = pass >= 3 ? 3 : 0;
var fixedTree = builder.Folder("FixedTree", "FixedTree");
for (var i = 0; i < count; i++)
{
fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo(
FullName: $"m.fixed.v{i}",
DriverDataType: DriverDataType.Float64,
IsArray: false,
ArrayDim: null,
SecurityClass: SecurityClassification.ViewOnly,
IsHistorized: false));
}
return Task.CompletedTask;
}
}
///
/// A discoverable driver whose DiscoverAsync genuinely SUSPENDS and resumes on a fresh
/// thread-pool thread that carries NO Akka actor cell — modelled on
/// SubscribableStubDriver.UnsubscribeYields. This forces the actor's await DiscoverAsync(...)
/// continuation to resume off-context unless the handler omits ConfigureAwait(false), so it is a
/// deterministic repro of the no-ActorContext race. Returns a stable 3-node set on every pass.
///
private sealed class YieldingDiscoverableStubDriver : StubDriver, ITagDiscovery
{
/// Suspends on a TCS completed from a background thread, then streams 3 nodes.
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_ = Task.Run(() => tcs.SetResult(), cancellationToken);
await tcs.Task.ConfigureAwait(false); // resume on a clean thread-pool thread (no actor cell)
var fixedTree = builder.Folder("FixedTree", "FixedTree");
for (var i = 0; i < 3; i++)
{
fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo(
FullName: $"m.fixed.v{i}",
DriverDataType: DriverDataType.Float64,
IsArray: false,
ArrayDim: null,
SecurityClass: SecurityClassification.ViewOnly,
IsHistorized: false));
}
}
}
///
/// A discoverable driver whose set NEVER stabilises: pass N yields N nodes (1,2,3,…), so the
/// full-reference signature differs every pass and the loop can only be bounded by the attempt cap.
///
private sealed class GrowingDiscoverableStubDriver : StubDriver, ITagDiscovery
{
private int _passCount;
/// Number of passes the actor has driven.
public int DiscoverCount => Volatile.Read(ref _passCount);
/// Streams an ever-growing node set (pass N → N nodes).
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
var pass = Interlocked.Increment(ref _passCount); // 1-based pass number
var fixedTree = builder.Folder("FixedTree", "FixedTree");
for (var i = 0; i < pass; i++)
{
fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo(
FullName: $"m.fixed.v{i}",
DriverDataType: DriverDataType.Float64,
IsArray: false,
ArrayDim: null,
SecurityClass: SecurityClassification.ViewOnly,
IsHistorized: false));
}
return Task.CompletedTask;
}
}
}