feat(otopcua): driver-instance post-connect bounded re-discovery
This commit is contained in:
@@ -98,6 +98,13 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
/// subscription that un-gates an <see cref="IAlarmSource"/> driver's feed. Handled async so the
|
||||
/// <see cref="IAlarmSource.SubscribeAlarmsAsync"/> call is bounded + off the synchronous handlers.</summary>
|
||||
private sealed record SubscribeAlarms;
|
||||
/// <summary>Published to the parent (DriverHostActor) after each post-connect discovery pass so it can
|
||||
/// graft the driver's discovered FixedTree nodes under the equipment. Empty/duplicate sets are fine —
|
||||
/// the parent dedups and injection is idempotent.</summary>
|
||||
public sealed record DiscoveredNodesReady(string DriverInstanceId, IReadOnlyList<DiscoveredNode> Nodes);
|
||||
|
||||
/// <summary>Internal self-tick driving bounded post-connect re-discovery (FixedTree populates ~0–2s after connect).</summary>
|
||||
private sealed record RediscoverTick(int Generation, int Attempt, int PreviousCount);
|
||||
public sealed class RetryConnect
|
||||
{
|
||||
public static readonly RetryConnect Instance = new();
|
||||
@@ -112,6 +119,14 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
private readonly string _clusterId;
|
||||
private readonly IDriverHealthPublisher _healthPublisher;
|
||||
private readonly TimeSpan _reconnectInterval;
|
||||
|
||||
/// <summary>Interval between bounded post-connect re-discovery passes. Production default 2s; tests
|
||||
/// inject a tiny value so the loop runs without real-time waits.</summary>
|
||||
private readonly TimeSpan _rediscoverInterval;
|
||||
|
||||
/// <summary>Cap on the number of post-connect re-discovery passes — a backstop so a never-stabilising
|
||||
/// (or perpetually-empty) discovered set cannot spin the loop forever. Production default 15.</summary>
|
||||
private readonly int _rediscoverMaxAttempts;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
private string? _currentConfigJson;
|
||||
|
||||
@@ -167,18 +182,24 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
/// stub paths don't need to provide one.</param>
|
||||
/// <param name="clusterId">Optional cluster identifier forwarded in <see cref="DriverHealthChanged"/> messages;
|
||||
/// defaults to an empty string when not provided (e.g. in unit tests).</param>
|
||||
/// <param name="rediscoverInterval">Optional interval between post-connect re-discovery passes; defaults to 2 seconds.</param>
|
||||
/// <param name="rediscoverMaxAttempts">Optional cap on re-discovery passes; defaults to 15.</param>
|
||||
public static Props Props(
|
||||
IDriver driver,
|
||||
TimeSpan? reconnectInterval = null,
|
||||
bool startStubbed = false,
|
||||
IDriverHealthPublisher? healthPublisher = null,
|
||||
string? clusterId = null) =>
|
||||
string? clusterId = null,
|
||||
TimeSpan? rediscoverInterval = null,
|
||||
int rediscoverMaxAttempts = 15) =>
|
||||
Akka.Actor.Props.Create(() => new DriverInstanceActor(
|
||||
driver,
|
||||
reconnectInterval ?? DefaultReconnectInterval,
|
||||
startStubbed,
|
||||
healthPublisher ?? NullDriverHealthPublisher.Instance,
|
||||
clusterId ?? string.Empty));
|
||||
clusterId ?? string.Empty,
|
||||
rediscoverInterval,
|
||||
rediscoverMaxAttempts));
|
||||
|
||||
/// <summary>
|
||||
/// Returns true when the driver should boot in DEV-STUB mode based on host platform and
|
||||
@@ -210,18 +231,24 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
/// <param name="startStubbed">If true, start in stub mode for testing or unavailable platforms.</param>
|
||||
/// <param name="healthPublisher">Sink for health-change notifications; must not be null.</param>
|
||||
/// <param name="clusterId">Cluster identifier forwarded in health snapshots.</param>
|
||||
/// <param name="rediscoverInterval">Interval between post-connect re-discovery passes; defaults to 2 seconds.</param>
|
||||
/// <param name="rediscoverMaxAttempts">Cap on the number of re-discovery passes; defaults to 15.</param>
|
||||
public DriverInstanceActor(
|
||||
IDriver driver,
|
||||
TimeSpan reconnectInterval,
|
||||
bool startStubbed = false,
|
||||
IDriverHealthPublisher? healthPublisher = null,
|
||||
string? clusterId = null)
|
||||
string? clusterId = null,
|
||||
TimeSpan? rediscoverInterval = null,
|
||||
int rediscoverMaxAttempts = 15)
|
||||
{
|
||||
_driver = driver;
|
||||
_driverInstanceId = driver.DriverInstanceId;
|
||||
_clusterId = clusterId ?? string.Empty;
|
||||
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
|
||||
_reconnectInterval = reconnectInterval;
|
||||
_rediscoverInterval = rediscoverInterval ?? TimeSpan.FromSeconds(2);
|
||||
_rediscoverMaxAttempts = rediscoverMaxAttempts;
|
||||
OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1,
|
||||
new KeyValuePair<string, object?>("event", startStubbed ? "spawn_stub" : "spawn"),
|
||||
new KeyValuePair<string, object?>("driver_type", driver.DriverType));
|
||||
@@ -284,6 +311,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
ResubscribeDesired();
|
||||
AttachAlarmSource();
|
||||
SubscribeDesiredAlarms();
|
||||
StartDiscovery();
|
||||
});
|
||||
Receive<InitializeFailed>(msg =>
|
||||
{
|
||||
@@ -321,6 +349,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
{
|
||||
_log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting",
|
||||
_driverInstanceId, msg.Reason);
|
||||
Timers.Cancel("rediscover");
|
||||
DetachSubscription();
|
||||
RecordFault();
|
||||
Become(Reconnecting);
|
||||
@@ -329,10 +358,12 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
Receive<ForceReconnect>(_ =>
|
||||
{
|
||||
_log.Info("DriverInstance {Id}: ForceReconnect requested by admin; re-entering Reconnecting", _driverInstanceId);
|
||||
Timers.Cancel("rediscover");
|
||||
DetachSubscription();
|
||||
Become(Reconnecting);
|
||||
PublishHealthSnapshot();
|
||||
});
|
||||
ReceiveAsync<RediscoverTick>(HandleRediscoverAsync);
|
||||
ReceiveAsync<WriteAttribute>(HandleWriteAsync);
|
||||
ReceiveAsync<RouteAlarmAck>(HandleAcknowledgeAsync);
|
||||
ReceiveAsync<Subscribe>(HandleSubscribeAsync);
|
||||
@@ -677,6 +708,59 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Kick the bounded post-connect re-discovery loop on a <c>Connected</c> entry. A no-op unless the
|
||||
/// driver exposes <see cref="ITagDiscovery"/> (nothing to inject otherwise). Self-sends the first
|
||||
/// <see cref="RediscoverTick"/> tagged with the current init generation so a tick that outlives a reconnect
|
||||
/// is rejected by the generation guard in <see cref="HandleRediscoverAsync"/>.</summary>
|
||||
private void StartDiscovery()
|
||||
{
|
||||
if (_driver is not ITagDiscovery) return; // driver doesn't expose discovery — nothing to inject
|
||||
Self.Tell(new RediscoverTick(_initGeneration, Attempt: 0, PreviousCount: -1));
|
||||
}
|
||||
|
||||
/// <summary>Runs one post-connect discovery pass: captures the driver's streamed FixedTree via a
|
||||
/// <see cref="CapturingAddressSpaceBuilder"/> and ships the result to the parent as
|
||||
/// <see cref="DiscoveredNodesReady"/> (empty/duplicate sets are fine — the parent dedups and injection
|
||||
/// is idempotent). Retries on the <see cref="_rediscoverInterval"/> until the non-empty discovered set
|
||||
/// has STABILISED (same count two passes running) or the <see cref="_rediscoverMaxAttempts"/> cap is hit,
|
||||
/// whichever comes first; keeps retrying while empty because a FOCAS-style FixedTree cache may still be
|
||||
/// populating. The generation guard (checked before and again after the await) drops a tick from a
|
||||
/// superseded (re)connect so a stale loop cannot resurrect or double-ship.
|
||||
/// <para>Limitation: this assumes a driver's discovered set only GROWS toward a stable size (true for
|
||||
/// FOCAS — its FixedTree appears once, and on the wonder deploy the driver-config <c>_options.Tags</c> is
|
||||
/// empty so the set is 0 until the cache populates). A driver that emits an initial non-empty set and
|
||||
/// later grows could stop early on a transient repeat; acceptable for current scope.</para></summary>
|
||||
private async Task HandleRediscoverAsync(RediscoverTick tick)
|
||||
{
|
||||
if (tick.Generation != _initGeneration) return; // stale (a reconnect happened)
|
||||
if (_driver is not ITagDiscovery discovery) return;
|
||||
|
||||
IReadOnlyList<DiscoveredNode> nodes;
|
||||
try
|
||||
{
|
||||
var builder = new CapturingAddressSpaceBuilder();
|
||||
await discovery.DiscoverAsync(builder, CancellationToken.None).ConfigureAwait(false);
|
||||
nodes = builder.Nodes;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Debug(ex, "DriverInstance {Id}: discovery pass {Attempt} failed; will retry", _driverInstanceId, tick.Attempt);
|
||||
nodes = Array.Empty<DiscoveredNode>();
|
||||
}
|
||||
|
||||
if (tick.Generation != _initGeneration) return; // re-check after the await (state may have changed)
|
||||
|
||||
Context.Parent.Tell(new DiscoveredNodesReady(_driverInstanceId, nodes));
|
||||
|
||||
// Stop when the non-empty discovered set has stabilised, or the attempt cap is hit. Keep retrying
|
||||
// while empty (FixedTree cache may still be populating). PreviousCount=-1 on the first pass.
|
||||
var stableNonEmpty = nodes.Count > 0 && nodes.Count == tick.PreviousCount;
|
||||
if (tick.Attempt + 1 < _rediscoverMaxAttempts && !stableNonEmpty)
|
||||
Timers.StartSingleTimer("rediscover", new RediscoverTick(tick.Generation, tick.Attempt + 1, nodes.Count), _rediscoverInterval);
|
||||
else
|
||||
_log.Debug("DriverInstance {Id}: discovery settled after {Attempt} pass(es), {Count} node(s)", _driverInstanceId, tick.Attempt + 1, nodes.Count);
|
||||
}
|
||||
|
||||
/// <summary>Records the host's desired subscription set without touching the live subscription.
|
||||
/// The set is (re)applied by <see cref="ResubscribeDesired"/> on the next <c>Connected</c> entry.</summary>
|
||||
private void StoreDesiredSubscriptions(SetDesiredSubscriptions msg)
|
||||
|
||||
+111
@@ -0,0 +1,111 @@
|
||||
using Akka.Actor;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
|
||||
|
||||
/// <summary>
|
||||
/// Covers the bounded post-connect re-discovery loop: when an <see cref="ITagDiscovery"/> driver
|
||||
/// reaches Connected, <see cref="DriverInstanceActor"/> runs repeated discovery passes (FOCAS-style:
|
||||
/// the FixedTree is suppressed until the driver's cache populates ~0–2s after connect) and ships each
|
||||
/// pass's captured nodes to its parent as <see cref="DriverInstanceActor.DiscoveredNodesReady"/>. The
|
||||
/// loop STOPS once the non-empty discovered set stabilises (or the attempt cap is hit) — it must not
|
||||
/// spin forever. A driver that does not implement <see cref="ITagDiscovery"/> produces no passes at all.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class DriverInstanceActorDiscoveryTests : RuntimeActorTestBase
|
||||
{
|
||||
/// <summary>
|
||||
/// A discoverable driver whose first two passes yield nothing (cache still warming) and whose third
|
||||
/// pass onward yields a stable 3-node set: the actor ships every pass, then STOPS once the non-empty
|
||||
/// set repeats. The final <see cref="DriverInstanceActor.DiscoveredNodesReady"/> carries the 3 nodes
|
||||
/// and no further passes arrive — proving the loop is bounded.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void Discovery_retries_until_set_stabilises_then_stops()
|
||||
{
|
||||
var driver = new DiscoverableStubDriver();
|
||||
var parent = CreateTestProbe();
|
||||
// Tiny interval so the bounded retry runs in well under a second (no real-time waits).
|
||||
var actor = parent.ChildActorOf(DriverInstanceActor.Props(
|
||||
driver, rediscoverInterval: TimeSpan.FromMilliseconds(20)));
|
||||
|
||||
// Drive Connecting → Connected; the Connected entry kicks discovery.
|
||||
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
||||
|
||||
// Each discovery pass publishes one DiscoveredNodesReady. The fake stabilises after pass 4
|
||||
// (passes: 0,0,3,3), so exactly 4 messages arrive, then the stream stops.
|
||||
var msgs = new List<DriverInstanceActor.DiscoveredNodesReady>();
|
||||
for (var i = 0; i < 4; i++)
|
||||
msgs.Add(parent.ExpectMsg<DriverInstanceActor.DiscoveredNodesReady>(TimeSpan.FromSeconds(2)));
|
||||
|
||||
// The loop must STOP once the non-empty set has stabilised — no fifth pass.
|
||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||
|
||||
// Early passes were empty (FixedTree cache still populating).
|
||||
msgs[0].Nodes.Count.ShouldBe(0);
|
||||
msgs[1].Nodes.Count.ShouldBe(0);
|
||||
// The set then appears and stabilises at 3 nodes.
|
||||
msgs[2].Nodes.Count.ShouldBe(3);
|
||||
var final = msgs[^1];
|
||||
final.Nodes.Count.ShouldBe(3);
|
||||
final.DriverInstanceId.ShouldBe(driver.DriverInstanceId);
|
||||
final.Nodes.Select(n => n.FullReference).ShouldBe(new[] { "m.fixed.v0", "m.fixed.v1", "m.fixed.v2" });
|
||||
|
||||
// The driver was asked exactly as many times as messages published — no extra zombie pass.
|
||||
driver.DiscoverCount.ShouldBe(4);
|
||||
}
|
||||
|
||||
/// <summary>A driver that does not implement <see cref="ITagDiscovery"/> produces no discovery passes —
|
||||
/// the Connected entry's discovery kick is a no-op, so the parent receives no
|
||||
/// <see cref="DriverInstanceActor.DiscoveredNodesReady"/>.</summary>
|
||||
[Fact]
|
||||
public void Driver_without_ITagDiscovery_produces_no_discovery()
|
||||
{
|
||||
var driver = new SubscribableStubDriver(); // IDriver + ISubscribable, NOT ITagDiscovery
|
||||
var parent = CreateTestProbe();
|
||||
var actor = parent.ChildActorOf(DriverInstanceActor.Props(
|
||||
driver, rediscoverInterval: TimeSpan.FromMilliseconds(20)));
|
||||
|
||||
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
||||
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
||||
|
||||
// No discovery capability ⇒ never any DiscoveredNodesReady to the parent.
|
||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A <see cref="StubDriver"/> that also exposes <see cref="ITagDiscovery"/>. Each <c>DiscoverAsync</c>
|
||||
/// pass is counted; passes 1–2 yield nothing (cache warming), passes 3+ yield a stable 3-node set —
|
||||
/// modelling FOCAS, whose FixedTree appears once a few seconds after connect and then stays put.
|
||||
/// </summary>
|
||||
private sealed class DiscoverableStubDriver : StubDriver, ITagDiscovery
|
||||
{
|
||||
private int _passCount;
|
||||
|
||||
/// <summary>Number of <see cref="DiscoverAsync"/> passes the actor has driven.</summary>
|
||||
public int DiscoverCount => Volatile.Read(ref _passCount);
|
||||
|
||||
/// <summary>Streams a growing-then-stable node set into the builder (0,0,3,3,…).</summary>
|
||||
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
|
||||
{
|
||||
var pass = Interlocked.Increment(ref _passCount); // 1-based pass number
|
||||
var count = pass >= 3 ? 3 : 0;
|
||||
var fixedTree = builder.Folder("FixedTree", "FixedTree");
|
||||
for (var i = 0; i < count; i++)
|
||||
{
|
||||
fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo(
|
||||
FullName: $"m.fixed.v{i}",
|
||||
DriverDataType: DriverDataType.Float64,
|
||||
IsArray: false,
|
||||
ArrayDim: null,
|
||||
SecurityClass: SecurityClassification.ViewOnly,
|
||||
IsHistorized: false));
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user