diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs index 062c9c62..c062e7ee 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -98,6 +98,13 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers /// subscription that un-gates an driver's feed. Handled async so the /// call is bounded + off the synchronous handlers. private sealed record SubscribeAlarms; + /// Published to the parent (DriverHostActor) after each post-connect discovery pass so it can + /// graft the driver's discovered FixedTree nodes under the equipment. Empty/duplicate sets are fine — + /// the parent dedups and injection is idempotent. + public sealed record DiscoveredNodesReady(string DriverInstanceId, IReadOnlyList Nodes); + + /// Internal self-tick driving bounded post-connect re-discovery (FixedTree populates ~0–2s after connect). + private sealed record RediscoverTick(int Generation, int Attempt, int PreviousCount); public sealed class RetryConnect { public static readonly RetryConnect Instance = new(); @@ -112,6 +119,14 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers private readonly string _clusterId; private readonly IDriverHealthPublisher _healthPublisher; private readonly TimeSpan _reconnectInterval; + + /// Interval between bounded post-connect re-discovery passes. Production default 2s; tests + /// inject a tiny value so the loop runs without real-time waits. + private readonly TimeSpan _rediscoverInterval; + + /// Cap on the number of post-connect re-discovery passes — a backstop so a never-stabilising + /// (or perpetually-empty) discovered set cannot spin the loop forever. Production default 15. + private readonly int _rediscoverMaxAttempts; private readonly ILoggingAdapter _log = Context.GetLogger(); private string? _currentConfigJson; @@ -167,18 +182,24 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers /// stub paths don't need to provide one. /// Optional cluster identifier forwarded in messages; /// defaults to an empty string when not provided (e.g. in unit tests). + /// Optional interval between post-connect re-discovery passes; defaults to 2 seconds. + /// Optional cap on re-discovery passes; defaults to 15. public static Props Props( IDriver driver, TimeSpan? reconnectInterval = null, bool startStubbed = false, IDriverHealthPublisher? healthPublisher = null, - string? clusterId = null) => + string? clusterId = null, + TimeSpan? rediscoverInterval = null, + int rediscoverMaxAttempts = 15) => Akka.Actor.Props.Create(() => new DriverInstanceActor( driver, reconnectInterval ?? DefaultReconnectInterval, startStubbed, healthPublisher ?? NullDriverHealthPublisher.Instance, - clusterId ?? string.Empty)); + clusterId ?? string.Empty, + rediscoverInterval, + rediscoverMaxAttempts)); /// /// Returns true when the driver should boot in DEV-STUB mode based on host platform and @@ -210,18 +231,24 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers /// If true, start in stub mode for testing or unavailable platforms. /// Sink for health-change notifications; must not be null. /// Cluster identifier forwarded in health snapshots. + /// Interval between post-connect re-discovery passes; defaults to 2 seconds. + /// Cap on the number of re-discovery passes; defaults to 15. public DriverInstanceActor( IDriver driver, TimeSpan reconnectInterval, bool startStubbed = false, IDriverHealthPublisher? healthPublisher = null, - string? clusterId = null) + string? clusterId = null, + TimeSpan? rediscoverInterval = null, + int rediscoverMaxAttempts = 15) { _driver = driver; _driverInstanceId = driver.DriverInstanceId; _clusterId = clusterId ?? string.Empty; _healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance; _reconnectInterval = reconnectInterval; + _rediscoverInterval = rediscoverInterval ?? TimeSpan.FromSeconds(2); + _rediscoverMaxAttempts = rediscoverMaxAttempts; OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1, new KeyValuePair("event", startStubbed ? "spawn_stub" : "spawn"), new KeyValuePair("driver_type", driver.DriverType)); @@ -284,6 +311,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers ResubscribeDesired(); AttachAlarmSource(); SubscribeDesiredAlarms(); + StartDiscovery(); }); Receive(msg => { @@ -321,6 +349,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers { _log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting", _driverInstanceId, msg.Reason); + Timers.Cancel("rediscover"); DetachSubscription(); RecordFault(); Become(Reconnecting); @@ -329,10 +358,12 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Receive(_ => { _log.Info("DriverInstance {Id}: ForceReconnect requested by admin; re-entering Reconnecting", _driverInstanceId); + Timers.Cancel("rediscover"); DetachSubscription(); Become(Reconnecting); PublishHealthSnapshot(); }); + ReceiveAsync(HandleRediscoverAsync); ReceiveAsync(HandleWriteAsync); ReceiveAsync(HandleAcknowledgeAsync); ReceiveAsync(HandleSubscribeAsync); @@ -677,6 +708,59 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers } } + /// Kick the bounded post-connect re-discovery loop on a Connected entry. A no-op unless the + /// driver exposes (nothing to inject otherwise). Self-sends the first + /// tagged with the current init generation so a tick that outlives a reconnect + /// is rejected by the generation guard in . + private void StartDiscovery() + { + if (_driver is not ITagDiscovery) return; // driver doesn't expose discovery — nothing to inject + Self.Tell(new RediscoverTick(_initGeneration, Attempt: 0, PreviousCount: -1)); + } + + /// Runs one post-connect discovery pass: captures the driver's streamed FixedTree via a + /// and ships the result to the parent as + /// (empty/duplicate sets are fine — the parent dedups and injection + /// is idempotent). Retries on the until the non-empty discovered set + /// has STABILISED (same count two passes running) or the cap is hit, + /// whichever comes first; keeps retrying while empty because a FOCAS-style FixedTree cache may still be + /// populating. The generation guard (checked before and again after the await) drops a tick from a + /// superseded (re)connect so a stale loop cannot resurrect or double-ship. + /// Limitation: this assumes a driver's discovered set only GROWS toward a stable size (true for + /// FOCAS — its FixedTree appears once, and on the wonder deploy the driver-config _options.Tags is + /// empty so the set is 0 until the cache populates). A driver that emits an initial non-empty set and + /// later grows could stop early on a transient repeat; acceptable for current scope. + private async Task HandleRediscoverAsync(RediscoverTick tick) + { + if (tick.Generation != _initGeneration) return; // stale (a reconnect happened) + if (_driver is not ITagDiscovery discovery) return; + + IReadOnlyList nodes; + try + { + var builder = new CapturingAddressSpaceBuilder(); + await discovery.DiscoverAsync(builder, CancellationToken.None).ConfigureAwait(false); + nodes = builder.Nodes; + } + catch (Exception ex) + { + _log.Debug(ex, "DriverInstance {Id}: discovery pass {Attempt} failed; will retry", _driverInstanceId, tick.Attempt); + nodes = Array.Empty(); + } + + if (tick.Generation != _initGeneration) return; // re-check after the await (state may have changed) + + Context.Parent.Tell(new DiscoveredNodesReady(_driverInstanceId, nodes)); + + // Stop when the non-empty discovered set has stabilised, or the attempt cap is hit. Keep retrying + // while empty (FixedTree cache may still be populating). PreviousCount=-1 on the first pass. + var stableNonEmpty = nodes.Count > 0 && nodes.Count == tick.PreviousCount; + if (tick.Attempt + 1 < _rediscoverMaxAttempts && !stableNonEmpty) + Timers.StartSingleTimer("rediscover", new RediscoverTick(tick.Generation, tick.Attempt + 1, nodes.Count), _rediscoverInterval); + else + _log.Debug("DriverInstance {Id}: discovery settled after {Attempt} pass(es), {Count} node(s)", _driverInstanceId, tick.Attempt + 1, nodes.Count); + } + /// Records the host's desired subscription set without touching the live subscription. /// The set is (re)applied by on the next Connected entry. private void StoreDesiredSubscriptions(SetDesiredSubscriptions msg) diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorDiscoveryTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorDiscoveryTests.cs new file mode 100644 index 00000000..a97542c7 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorDiscoveryTests.cs @@ -0,0 +1,111 @@ +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; + +/// +/// Covers the bounded post-connect re-discovery loop: when an driver +/// reaches Connected, runs repeated discovery passes (FOCAS-style: +/// the FixedTree is suppressed until the driver's cache populates ~0–2s after connect) and ships each +/// pass's captured nodes to its parent as . The +/// loop STOPS once the non-empty discovered set stabilises (or the attempt cap is hit) — it must not +/// spin forever. A driver that does not implement produces no passes at all. +/// +[Trait("Category", "Unit")] +public sealed class DriverInstanceActorDiscoveryTests : RuntimeActorTestBase +{ + /// + /// A discoverable driver whose first two passes yield nothing (cache still warming) and whose third + /// pass onward yields a stable 3-node set: the actor ships every pass, then STOPS once the non-empty + /// set repeats. The final carries the 3 nodes + /// and no further passes arrive — proving the loop is bounded. + /// + [Fact] + public void Discovery_retries_until_set_stabilises_then_stops() + { + var driver = new DiscoverableStubDriver(); + var parent = CreateTestProbe(); + // Tiny interval so the bounded retry runs in well under a second (no real-time waits). + var actor = parent.ChildActorOf(DriverInstanceActor.Props( + driver, rediscoverInterval: TimeSpan.FromMilliseconds(20))); + + // Drive Connecting → Connected; the Connected entry kicks discovery. + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + + // Each discovery pass publishes one DiscoveredNodesReady. The fake stabilises after pass 4 + // (passes: 0,0,3,3), so exactly 4 messages arrive, then the stream stops. + var msgs = new List(); + for (var i = 0; i < 4; i++) + msgs.Add(parent.ExpectMsg(TimeSpan.FromSeconds(2))); + + // The loop must STOP once the non-empty set has stabilised — no fifth pass. + parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + + // Early passes were empty (FixedTree cache still populating). + msgs[0].Nodes.Count.ShouldBe(0); + msgs[1].Nodes.Count.ShouldBe(0); + // The set then appears and stabilises at 3 nodes. + msgs[2].Nodes.Count.ShouldBe(3); + var final = msgs[^1]; + final.Nodes.Count.ShouldBe(3); + final.DriverInstanceId.ShouldBe(driver.DriverInstanceId); + final.Nodes.Select(n => n.FullReference).ShouldBe(new[] { "m.fixed.v0", "m.fixed.v1", "m.fixed.v2" }); + + // The driver was asked exactly as many times as messages published — no extra zombie pass. + driver.DiscoverCount.ShouldBe(4); + } + + /// A driver that does not implement produces no discovery passes — + /// the Connected entry's discovery kick is a no-op, so the parent receives no + /// . + [Fact] + public void Driver_without_ITagDiscovery_produces_no_discovery() + { + var driver = new SubscribableStubDriver(); // IDriver + ISubscribable, NOT ITagDiscovery + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(DriverInstanceActor.Props( + driver, rediscoverInterval: TimeSpan.FromMilliseconds(20))); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2)); + + // No discovery capability ⇒ never any DiscoveredNodesReady to the parent. + parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + } + + /// + /// A that also exposes . Each DiscoverAsync + /// pass is counted; passes 1–2 yield nothing (cache warming), passes 3+ yield a stable 3-node set — + /// modelling FOCAS, whose FixedTree appears once a few seconds after connect and then stays put. + /// + private sealed class DiscoverableStubDriver : StubDriver, ITagDiscovery + { + private int _passCount; + + /// Number of passes the actor has driven. + public int DiscoverCount => Volatile.Read(ref _passCount); + + /// Streams a growing-then-stable node set into the builder (0,0,3,3,…). + public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) + { + var pass = Interlocked.Increment(ref _passCount); // 1-based pass number + var count = pass >= 3 ? 3 : 0; + var fixedTree = builder.Folder("FixedTree", "FixedTree"); + for (var i = 0; i < count; i++) + { + fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo( + FullName: $"m.fixed.v{i}", + DriverDataType: DriverDataType.Float64, + IsArray: false, + ArrayDim: null, + SecurityClass: SecurityClassification.ViewOnly, + IsHistorized: false)); + } + return Task.CompletedTask; + } + } +}