diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs index b2ca2708..ebb77b8d 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -32,6 +32,12 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers { public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10); + /// Default interval between bounded post-connect re-discovery passes. + public static readonly TimeSpan DefaultRediscoverInterval = TimeSpan.FromSeconds(2); + + /// Default cap on the number of post-connect re-discovery passes. + public const int DefaultRediscoverMaxAttempts = 15; + public sealed record InitializeRequested(string DriverConfigJson); public sealed record InitializeSucceeded(int Generation); public sealed record InitializeFailed(string Reason, int Generation); @@ -103,8 +109,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers /// the parent dedups and injection is idempotent. public sealed record DiscoveredNodesReady(string DriverInstanceId, IReadOnlyList Nodes); - /// Internal self-tick driving bounded post-connect re-discovery (FixedTree populates ~0–2s after connect). - private sealed record RediscoverTick(int Generation, int Attempt, int PreviousCount); + /// Internal self-tick driving bounded post-connect re-discovery (FixedTree populates ~0–2s after connect). + /// is the ordered-distinct full-reference signature of the prior pass's + /// captured set (empty string on the first tick); re-discovery stops once a non-empty set repeats it. + private sealed record RediscoverTick(int Generation, int Attempt, string PreviousSignature); public sealed class RetryConnect { public static readonly RetryConnect Instance = new(); @@ -191,7 +199,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers IDriverHealthPublisher? healthPublisher = null, string? clusterId = null, TimeSpan? rediscoverInterval = null, - int rediscoverMaxAttempts = 15) => + int rediscoverMaxAttempts = DefaultRediscoverMaxAttempts) => Akka.Actor.Props.Create(() => new DriverInstanceActor( driver, reconnectInterval ?? DefaultReconnectInterval, @@ -240,14 +248,14 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers IDriverHealthPublisher? healthPublisher = null, string? clusterId = null, TimeSpan? rediscoverInterval = null, - int rediscoverMaxAttempts = 15) + int rediscoverMaxAttempts = DefaultRediscoverMaxAttempts) { _driver = driver; _driverInstanceId = driver.DriverInstanceId; _clusterId = clusterId ?? string.Empty; _healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance; _reconnectInterval = reconnectInterval; - _rediscoverInterval = rediscoverInterval ?? TimeSpan.FromSeconds(2); + _rediscoverInterval = rediscoverInterval ?? DefaultRediscoverInterval; _rediscoverMaxAttempts = rediscoverMaxAttempts; OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1, new KeyValuePair("event", startStubbed ? "spawn_stub" : "spawn"), @@ -286,6 +294,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Receive(_ => { /* stubbed drivers don't disconnect */ }); Receive(_ => { /* stubbed drivers don't reconnect */ }); Receive(StoreDesiredSubscriptions); + // Stubbed drivers never enter Connected, so they never kick discovery; swallow defensively in case a + // re-discovery self-tick is ever routed here so it doesn't surface as an Akka Unhandled message. + Receive(_ => { }); Receive(_ => PublishHealthSnapshot()); } @@ -339,6 +350,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers // A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into // this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes. Receive(_ => { }); + // Likewise the attempt-0 re-discovery self-tick (sent on Connected entry) can be overtaken by an + // already-queued disconnect; swallow it — the next Connected entry re-kicks discovery. + Receive(_ => { }); Receive(_ => PublishHealthSnapshot()); } @@ -444,6 +458,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers // A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into // this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes. Receive(_ => { }); + // Likewise the attempt-0 re-discovery self-tick (sent on Connected entry) can be overtaken by an + // already-queued disconnect; swallow it — the next Connected entry re-kicks discovery. + Receive(_ => { }); Receive(_ => PublishHealthSnapshot()); Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval); } @@ -712,52 +729,67 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers /// Kick the bounded post-connect re-discovery loop on a Connected entry. A no-op unless the /// driver exposes (nothing to inject otherwise). Self-sends the first /// tagged with the current init generation so a tick that outlives a reconnect - /// is rejected by the generation guard in . + /// is rejected by the generation guard in . + /// Generic by design: re-discovery runs for EVERY driver on each + /// (re)connect, bounded by stop-on-stable (the discovered-set signature repeats) + the attempt cap. + /// Narrowing this to opt-in for heavy network drivers (Galaxy / OpcUaClient) is a follow-up. private void StartDiscovery() { if (_driver is not ITagDiscovery) return; // driver doesn't expose discovery — nothing to inject - Self.Tell(new RediscoverTick(_initGeneration, Attempt: 0, PreviousCount: -1)); + Self.Tell(new RediscoverTick(_initGeneration, Attempt: 0, PreviousSignature: string.Empty)); } /// Runs one post-connect discovery pass: captures the driver's streamed FixedTree via a /// and ships the result to the parent as /// (empty/duplicate sets are fine — the parent dedups and injection - /// is idempotent). Retries on the until the non-empty discovered set - /// has STABILISED (same count two passes running) or the cap is hit, - /// whichever comes first; keeps retrying while empty because a FOCAS-style FixedTree cache may still be - /// populating. The generation guard (checked before and again after the await) drops a tick from a - /// superseded (re)connect so a stale loop cannot resurrect or double-ship. - /// Limitation: this assumes a driver's discovered set only GROWS toward a stable size (true for + /// is idempotent). Retries on the until the non-empty discovered SET + /// has STABILISED (the ordered-distinct full-reference signature repeats — robust for incremental/paged + /// browsers where a count alone could falsely settle a partial tree) or the + /// cap is hit, whichever comes first; keeps retrying while empty because a FOCAS-style FixedTree cache may + /// still be populating. + /// Limitation: this assumes a driver's discovered set only GROWS toward a stable shape (true for /// FOCAS — its FixedTree appears once, and on the wonder deploy the driver-config _options.Tags is /// empty so the set is 0 until the cache populates). A driver that emits an initial non-empty set and /// later grows could stop early on a transient repeat; acceptable for current scope. private async Task HandleRediscoverAsync(RediscoverTick tick) { - if (tick.Generation != _initGeneration) return; // stale (a reconnect happened) + if (tick.Generation != _initGeneration) return; // stale (a reconnect superseded this pass) if (_driver is not ITagDiscovery discovery) return; IReadOnlyList nodes; try { var builder = new CapturingAddressSpaceBuilder(); - await discovery.DiscoverAsync(builder, CancellationToken.None).ConfigureAwait(false); - nodes = builder.Nodes; + // Bound the browse — ReceiveAsync suspends the mailbox for the whole handler, so an unbounded + // DiscoverAsync would block DisconnectObserved / ForceReconnect / writes / health-poll behind it. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + // NO ConfigureAwait(false): a genuinely-async DiscoverAsync (Galaxy / OpcUaClient / TwinCAT) must + // resume on the actor task scheduler so the Context.Parent.Tell + Timers calls below run with a + // live ActorContext. ConfigureAwait(false) would resume off-context and throw + // NotSupportedException("no active ActorContext") — see the same warning on HandleSubscribeAsync. + await discovery.DiscoverAsync(builder, cts.Token); + nodes = builder.Nodes.ToArray(); // immutable snapshot — never hand the builder's live list across actors } catch (Exception ex) { - _log.Debug(ex, "DriverInstance {Id}: discovery pass {Attempt} failed; will retry", _driverInstanceId, tick.Attempt); + _log.Warning(ex, "DriverInstance {Id}: discovery pass {Attempt} failed; will retry", _driverInstanceId, tick.Attempt); nodes = Array.Empty(); } - if (tick.Generation != _initGeneration) return; // re-check after the await (state may have changed) + // Belt-and-suspenders: under ReceiveAsync the mailbox is suspended for the whole handler, so + // _initGeneration cannot change mid-await — the pre-await guard + Timers.Cancel("rediscover") on + // disconnect + single-timer key reuse are the primary protections. Re-checked in case that changes. + if (tick.Generation != _initGeneration) return; Context.Parent.Tell(new DiscoveredNodesReady(_driverInstanceId, nodes)); - // Stop when the non-empty discovered set has stabilised, or the attempt cap is hit. Keep retrying - // while empty (FixedTree cache may still be populating). PreviousCount=-1 on the first pass. - var stableNonEmpty = nodes.Count > 0 && nodes.Count == tick.PreviousCount; + // Stop when the non-empty discovered SET has stabilised (its signature repeats), or the attempt cap + // is hit. Keep retrying while empty (a FixedTree cache may still be populating). First tick carries "". + var signature = string.Join('\u0001', + nodes.Select(n => n.FullReference).Distinct(StringComparer.Ordinal).OrderBy(x => x, StringComparer.Ordinal)); + var stableNonEmpty = nodes.Count > 0 && string.Equals(signature, tick.PreviousSignature, StringComparison.Ordinal); if (tick.Attempt + 1 < _rediscoverMaxAttempts && !stableNonEmpty) - Timers.StartSingleTimer("rediscover", new RediscoverTick(tick.Generation, tick.Attempt + 1, nodes.Count), _rediscoverInterval); + Timers.StartSingleTimer("rediscover", new RediscoverTick(tick.Generation, tick.Attempt + 1, signature), _rediscoverInterval); else _log.Debug("DriverInstance {Id}: discovery settled after {Attempt} pass(es), {Count} node(s)", _driverInstanceId, tick.Attempt + 1, nodes.Count); } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorDiscoveryTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorDiscoveryTests.cs index f880fc3e..e319cc66 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorDiscoveryTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorDiscoveryTests.cs @@ -118,6 +118,59 @@ public sealed class DriverInstanceActorDiscoveryTests : RuntimeActorTestBase driver.DiscoverCount.ShouldBeGreaterThan(passesBeforeReconnect); } + /// + /// Regression for the Critical: a driver whose DiscoverAsync completes ASYNCHRONOUSLY (off the + /// actor thread) must still ship . The handler + /// touches Context.Parent + Timers AFTER awaiting discovery; if it awaited with + /// ConfigureAwait(false) the continuation would resume off the actor context and those calls + /// would throw NotSupportedException("no active ActorContext") — the handler would fault and no + /// message would arrive. Synchronous (Task.CompletedTask) stubs mask the bug; this one forces a + /// genuine off-context resume (modelled on SubscribableStubDriver.UnsubscribeYields). + /// + [Fact] + public void Async_completing_discovery_resumes_on_actor_context_and_publishes() + { + var driver = new YieldingDiscoverableStubDriver(); + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(DriverInstanceActor.Props( + driver, rediscoverInterval: TimeSpan.FromMilliseconds(20))); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + + // With the fix the handler resumes on the actor context, so the publish succeeds and the parent gets + // a non-empty set. Without it the handler faults at Context.Parent.Tell and this times out. + var published = parent.ExpectMsg(TimeSpan.FromSeconds(2)); + published.Nodes.Count.ShouldBe(3); + published.DriverInstanceId.ShouldBe(driver.DriverInstanceId); + } + + /// + /// The attempt cap bounds a discovered set that never stabilises: a driver whose set keeps GROWING + /// (1,2,3,…) never repeats its signature, so the loop is stopped only by + /// rediscoverMaxAttempts. With a cap of 3, exactly 3 passes are published, then the stream stops. + /// + [Fact] + public void Never_stabilising_discovery_is_bounded_by_the_attempt_cap() + { + var driver = new GrowingDiscoverableStubDriver(); + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(DriverInstanceActor.Props( + driver, rediscoverInterval: TimeSpan.FromMilliseconds(20), rediscoverMaxAttempts: 3)); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + + var msgs = new List(); + for (var i = 0; i < 3; i++) + msgs.Add(parent.ExpectMsg(TimeSpan.FromSeconds(2))); + + // Cap reached — no fourth pass even though the set never stabilised. + parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + + // The set genuinely kept growing across the capped passes (1,2,3 nodes). + msgs.Select(m => m.Nodes.Count).ShouldBe(new[] { 1, 2, 3 }); + driver.DiscoverCount.ShouldBe(3); + } + /// /// A that also exposes . Each DiscoverAsync /// pass is counted; passes 1–2 yield nothing (cache warming), passes 3+ yield a stable 3-node set — @@ -149,4 +202,63 @@ public sealed class DriverInstanceActorDiscoveryTests : RuntimeActorTestBase return Task.CompletedTask; } } + + /// + /// A discoverable driver whose DiscoverAsync genuinely SUSPENDS and resumes on a fresh + /// thread-pool thread that carries NO Akka actor cell — modelled on + /// SubscribableStubDriver.UnsubscribeYields. This forces the actor's await DiscoverAsync(...) + /// continuation to resume off-context unless the handler omits ConfigureAwait(false), so it is a + /// deterministic repro of the no-ActorContext race. Returns a stable 3-node set on every pass. + /// + private sealed class YieldingDiscoverableStubDriver : StubDriver, ITagDiscovery + { + /// Suspends on a TCS completed from a background thread, then streams 3 nodes. + public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _ = Task.Run(() => tcs.SetResult(), cancellationToken); + await tcs.Task.ConfigureAwait(false); // resume on a clean thread-pool thread (no actor cell) + var fixedTree = builder.Folder("FixedTree", "FixedTree"); + for (var i = 0; i < 3; i++) + { + fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo( + FullName: $"m.fixed.v{i}", + DriverDataType: DriverDataType.Float64, + IsArray: false, + ArrayDim: null, + SecurityClass: SecurityClassification.ViewOnly, + IsHistorized: false)); + } + } + } + + /// + /// A discoverable driver whose set NEVER stabilises: pass N yields N nodes (1,2,3,…), so the + /// full-reference signature differs every pass and the loop can only be bounded by the attempt cap. + /// + private sealed class GrowingDiscoverableStubDriver : StubDriver, ITagDiscovery + { + private int _passCount; + + /// Number of passes the actor has driven. + public int DiscoverCount => Volatile.Read(ref _passCount); + + /// Streams an ever-growing node set (pass N → N nodes). + public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) + { + var pass = Interlocked.Increment(ref _passCount); // 1-based pass number + var fixedTree = builder.Folder("FixedTree", "FixedTree"); + for (var i = 0; i < pass; i++) + { + fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo( + FullName: $"m.fixed.v{i}", + DriverDataType: DriverDataType.Float64, + IsArray: false, + ArrayDim: null, + SecurityClass: SecurityClassification.ViewOnly, + IsHistorized: false)); + } + return Task.CompletedTask; + } + } }