fix(otopcua): resume discovery on actor context + bound/harden re-discovery
This commit is contained in:
@@ -32,6 +32,12 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
{
|
{
|
||||||
public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10);
|
public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10);
|
||||||
|
|
||||||
|
/// <summary>Default interval between bounded post-connect re-discovery passes.</summary>
|
||||||
|
public static readonly TimeSpan DefaultRediscoverInterval = TimeSpan.FromSeconds(2);
|
||||||
|
|
||||||
|
/// <summary>Default cap on the number of post-connect re-discovery passes.</summary>
|
||||||
|
public const int DefaultRediscoverMaxAttempts = 15;
|
||||||
|
|
||||||
public sealed record InitializeRequested(string DriverConfigJson);
|
public sealed record InitializeRequested(string DriverConfigJson);
|
||||||
public sealed record InitializeSucceeded(int Generation);
|
public sealed record InitializeSucceeded(int Generation);
|
||||||
public sealed record InitializeFailed(string Reason, int Generation);
|
public sealed record InitializeFailed(string Reason, int Generation);
|
||||||
@@ -103,8 +109,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
/// the parent dedups and injection is idempotent.</summary>
|
/// the parent dedups and injection is idempotent.</summary>
|
||||||
public sealed record DiscoveredNodesReady(string DriverInstanceId, IReadOnlyList<DiscoveredNode> Nodes);
|
public sealed record DiscoveredNodesReady(string DriverInstanceId, IReadOnlyList<DiscoveredNode> Nodes);
|
||||||
|
|
||||||
/// <summary>Internal self-tick driving bounded post-connect re-discovery (FixedTree populates ~0–2s after connect).</summary>
|
/// <summary>Internal self-tick driving bounded post-connect re-discovery (FixedTree populates ~0–2s after connect).
|
||||||
private sealed record RediscoverTick(int Generation, int Attempt, int PreviousCount);
|
/// <paramref name="PreviousSignature"/> is the ordered-distinct full-reference signature of the prior pass's
|
||||||
|
/// captured set (empty string on the first tick); re-discovery stops once a non-empty set repeats it.</summary>
|
||||||
|
private sealed record RediscoverTick(int Generation, int Attempt, string PreviousSignature);
|
||||||
public sealed class RetryConnect
|
public sealed class RetryConnect
|
||||||
{
|
{
|
||||||
public static readonly RetryConnect Instance = new();
|
public static readonly RetryConnect Instance = new();
|
||||||
@@ -191,7 +199,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
IDriverHealthPublisher? healthPublisher = null,
|
IDriverHealthPublisher? healthPublisher = null,
|
||||||
string? clusterId = null,
|
string? clusterId = null,
|
||||||
TimeSpan? rediscoverInterval = null,
|
TimeSpan? rediscoverInterval = null,
|
||||||
int rediscoverMaxAttempts = 15) =>
|
int rediscoverMaxAttempts = DefaultRediscoverMaxAttempts) =>
|
||||||
Akka.Actor.Props.Create(() => new DriverInstanceActor(
|
Akka.Actor.Props.Create(() => new DriverInstanceActor(
|
||||||
driver,
|
driver,
|
||||||
reconnectInterval ?? DefaultReconnectInterval,
|
reconnectInterval ?? DefaultReconnectInterval,
|
||||||
@@ -240,14 +248,14 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
IDriverHealthPublisher? healthPublisher = null,
|
IDriverHealthPublisher? healthPublisher = null,
|
||||||
string? clusterId = null,
|
string? clusterId = null,
|
||||||
TimeSpan? rediscoverInterval = null,
|
TimeSpan? rediscoverInterval = null,
|
||||||
int rediscoverMaxAttempts = 15)
|
int rediscoverMaxAttempts = DefaultRediscoverMaxAttempts)
|
||||||
{
|
{
|
||||||
_driver = driver;
|
_driver = driver;
|
||||||
_driverInstanceId = driver.DriverInstanceId;
|
_driverInstanceId = driver.DriverInstanceId;
|
||||||
_clusterId = clusterId ?? string.Empty;
|
_clusterId = clusterId ?? string.Empty;
|
||||||
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
|
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
|
||||||
_reconnectInterval = reconnectInterval;
|
_reconnectInterval = reconnectInterval;
|
||||||
_rediscoverInterval = rediscoverInterval ?? TimeSpan.FromSeconds(2);
|
_rediscoverInterval = rediscoverInterval ?? DefaultRediscoverInterval;
|
||||||
_rediscoverMaxAttempts = rediscoverMaxAttempts;
|
_rediscoverMaxAttempts = rediscoverMaxAttempts;
|
||||||
OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1,
|
OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1,
|
||||||
new KeyValuePair<string, object?>("event", startStubbed ? "spawn_stub" : "spawn"),
|
new KeyValuePair<string, object?>("event", startStubbed ? "spawn_stub" : "spawn"),
|
||||||
@@ -286,6 +294,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
Receive<DisconnectObserved>(_ => { /* stubbed drivers don't disconnect */ });
|
Receive<DisconnectObserved>(_ => { /* stubbed drivers don't disconnect */ });
|
||||||
Receive<ForceReconnect>(_ => { /* stubbed drivers don't reconnect */ });
|
Receive<ForceReconnect>(_ => { /* stubbed drivers don't reconnect */ });
|
||||||
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
|
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
|
||||||
|
// Stubbed drivers never enter Connected, so they never kick discovery; swallow defensively in case a
|
||||||
|
// re-discovery self-tick is ever routed here so it doesn't surface as an Akka Unhandled message.
|
||||||
|
Receive<RediscoverTick>(_ => { });
|
||||||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -339,6 +350,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
// A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into
|
// A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into
|
||||||
// this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes.
|
// this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes.
|
||||||
Receive<SubscribeAlarms>(_ => { });
|
Receive<SubscribeAlarms>(_ => { });
|
||||||
|
// Likewise the attempt-0 re-discovery self-tick (sent on Connected entry) can be overtaken by an
|
||||||
|
// already-queued disconnect; swallow it — the next Connected entry re-kicks discovery.
|
||||||
|
Receive<RediscoverTick>(_ => { });
|
||||||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -444,6 +458,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
// A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into
|
// A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into
|
||||||
// this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes.
|
// this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes.
|
||||||
Receive<SubscribeAlarms>(_ => { });
|
Receive<SubscribeAlarms>(_ => { });
|
||||||
|
// Likewise the attempt-0 re-discovery self-tick (sent on Connected entry) can be overtaken by an
|
||||||
|
// already-queued disconnect; swallow it — the next Connected entry re-kicks discovery.
|
||||||
|
Receive<RediscoverTick>(_ => { });
|
||||||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||||||
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
|
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
|
||||||
}
|
}
|
||||||
@@ -712,52 +729,67 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
/// <summary>Kick the bounded post-connect re-discovery loop on a <c>Connected</c> entry. A no-op unless the
|
/// <summary>Kick the bounded post-connect re-discovery loop on a <c>Connected</c> entry. A no-op unless the
|
||||||
/// driver exposes <see cref="ITagDiscovery"/> (nothing to inject otherwise). Self-sends the first
|
/// driver exposes <see cref="ITagDiscovery"/> (nothing to inject otherwise). Self-sends the first
|
||||||
/// <see cref="RediscoverTick"/> tagged with the current init generation so a tick that outlives a reconnect
|
/// <see cref="RediscoverTick"/> tagged with the current init generation so a tick that outlives a reconnect
|
||||||
/// is rejected by the generation guard in <see cref="HandleRediscoverAsync"/>.</summary>
|
/// is rejected by the generation guard in <see cref="HandleRediscoverAsync"/>.
|
||||||
|
/// <para>Generic by design: re-discovery runs for EVERY <see cref="ITagDiscovery"/> driver on each
|
||||||
|
/// (re)connect, bounded by stop-on-stable (the discovered-set signature repeats) + the attempt cap.
|
||||||
|
/// Narrowing this to opt-in for heavy network drivers (Galaxy / OpcUaClient) is a follow-up.</para></summary>
|
||||||
private void StartDiscovery()
|
private void StartDiscovery()
|
||||||
{
|
{
|
||||||
if (_driver is not ITagDiscovery) return; // driver doesn't expose discovery — nothing to inject
|
if (_driver is not ITagDiscovery) return; // driver doesn't expose discovery — nothing to inject
|
||||||
Self.Tell(new RediscoverTick(_initGeneration, Attempt: 0, PreviousCount: -1));
|
Self.Tell(new RediscoverTick(_initGeneration, Attempt: 0, PreviousSignature: string.Empty));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>Runs one post-connect discovery pass: captures the driver's streamed FixedTree via a
|
/// <summary>Runs one post-connect discovery pass: captures the driver's streamed FixedTree via a
|
||||||
/// <see cref="CapturingAddressSpaceBuilder"/> and ships the result to the parent as
|
/// <see cref="CapturingAddressSpaceBuilder"/> and ships the result to the parent as
|
||||||
/// <see cref="DiscoveredNodesReady"/> (empty/duplicate sets are fine — the parent dedups and injection
|
/// <see cref="DiscoveredNodesReady"/> (empty/duplicate sets are fine — the parent dedups and injection
|
||||||
/// is idempotent). Retries on the <see cref="_rediscoverInterval"/> until the non-empty discovered set
|
/// is idempotent). Retries on the <see cref="_rediscoverInterval"/> until the non-empty discovered SET
|
||||||
/// has STABILISED (same count two passes running) or the <see cref="_rediscoverMaxAttempts"/> cap is hit,
|
/// has STABILISED (the ordered-distinct full-reference signature repeats — robust for incremental/paged
|
||||||
/// whichever comes first; keeps retrying while empty because a FOCAS-style FixedTree cache may still be
|
/// browsers where a count alone could falsely settle a partial tree) or the <see cref="_rediscoverMaxAttempts"/>
|
||||||
/// populating. The generation guard (checked before and again after the await) drops a tick from a
|
/// cap is hit, whichever comes first; keeps retrying while empty because a FOCAS-style FixedTree cache may
|
||||||
/// superseded (re)connect so a stale loop cannot resurrect or double-ship.
|
/// still be populating.
|
||||||
/// <para>Limitation: this assumes a driver's discovered set only GROWS toward a stable size (true for
|
/// <para>Limitation: this assumes a driver's discovered set only GROWS toward a stable shape (true for
|
||||||
/// FOCAS — its FixedTree appears once, and on the wonder deploy the driver-config <c>_options.Tags</c> is
|
/// FOCAS — its FixedTree appears once, and on the wonder deploy the driver-config <c>_options.Tags</c> is
|
||||||
/// empty so the set is 0 until the cache populates). A driver that emits an initial non-empty set and
|
/// empty so the set is 0 until the cache populates). A driver that emits an initial non-empty set and
|
||||||
/// later grows could stop early on a transient repeat; acceptable for current scope.</para></summary>
|
/// later grows could stop early on a transient repeat; acceptable for current scope.</para></summary>
|
||||||
private async Task HandleRediscoverAsync(RediscoverTick tick)
|
private async Task HandleRediscoverAsync(RediscoverTick tick)
|
||||||
{
|
{
|
||||||
if (tick.Generation != _initGeneration) return; // stale (a reconnect happened)
|
if (tick.Generation != _initGeneration) return; // stale (a reconnect superseded this pass)
|
||||||
if (_driver is not ITagDiscovery discovery) return;
|
if (_driver is not ITagDiscovery discovery) return;
|
||||||
|
|
||||||
IReadOnlyList<DiscoveredNode> nodes;
|
IReadOnlyList<DiscoveredNode> nodes;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var builder = new CapturingAddressSpaceBuilder();
|
var builder = new CapturingAddressSpaceBuilder();
|
||||||
await discovery.DiscoverAsync(builder, CancellationToken.None).ConfigureAwait(false);
|
// Bound the browse — ReceiveAsync suspends the mailbox for the whole handler, so an unbounded
|
||||||
nodes = builder.Nodes;
|
// DiscoverAsync would block DisconnectObserved / ForceReconnect / writes / health-poll behind it.
|
||||||
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
|
||||||
|
// NO ConfigureAwait(false): a genuinely-async DiscoverAsync (Galaxy / OpcUaClient / TwinCAT) must
|
||||||
|
// resume on the actor task scheduler so the Context.Parent.Tell + Timers calls below run with a
|
||||||
|
// live ActorContext. ConfigureAwait(false) would resume off-context and throw
|
||||||
|
// NotSupportedException("no active ActorContext") — see the same warning on HandleSubscribeAsync.
|
||||||
|
await discovery.DiscoverAsync(builder, cts.Token);
|
||||||
|
nodes = builder.Nodes.ToArray(); // immutable snapshot — never hand the builder's live list across actors
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_log.Debug(ex, "DriverInstance {Id}: discovery pass {Attempt} failed; will retry", _driverInstanceId, tick.Attempt);
|
_log.Warning(ex, "DriverInstance {Id}: discovery pass {Attempt} failed; will retry", _driverInstanceId, tick.Attempt);
|
||||||
nodes = Array.Empty<DiscoveredNode>();
|
nodes = Array.Empty<DiscoveredNode>();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tick.Generation != _initGeneration) return; // re-check after the await (state may have changed)
|
// Belt-and-suspenders: under ReceiveAsync the mailbox is suspended for the whole handler, so
|
||||||
|
// _initGeneration cannot change mid-await — the pre-await guard + Timers.Cancel("rediscover") on
|
||||||
|
// disconnect + single-timer key reuse are the primary protections. Re-checked in case that changes.
|
||||||
|
if (tick.Generation != _initGeneration) return;
|
||||||
|
|
||||||
Context.Parent.Tell(new DiscoveredNodesReady(_driverInstanceId, nodes));
|
Context.Parent.Tell(new DiscoveredNodesReady(_driverInstanceId, nodes));
|
||||||
|
|
||||||
// Stop when the non-empty discovered set has stabilised, or the attempt cap is hit. Keep retrying
|
// Stop when the non-empty discovered SET has stabilised (its signature repeats), or the attempt cap
|
||||||
// while empty (FixedTree cache may still be populating). PreviousCount=-1 on the first pass.
|
// is hit. Keep retrying while empty (a FixedTree cache may still be populating). First tick carries "".
|
||||||
var stableNonEmpty = nodes.Count > 0 && nodes.Count == tick.PreviousCount;
|
var signature = string.Join('\u0001',
|
||||||
|
nodes.Select(n => n.FullReference).Distinct(StringComparer.Ordinal).OrderBy(x => x, StringComparer.Ordinal));
|
||||||
|
var stableNonEmpty = nodes.Count > 0 && string.Equals(signature, tick.PreviousSignature, StringComparison.Ordinal);
|
||||||
if (tick.Attempt + 1 < _rediscoverMaxAttempts && !stableNonEmpty)
|
if (tick.Attempt + 1 < _rediscoverMaxAttempts && !stableNonEmpty)
|
||||||
Timers.StartSingleTimer("rediscover", new RediscoverTick(tick.Generation, tick.Attempt + 1, nodes.Count), _rediscoverInterval);
|
Timers.StartSingleTimer("rediscover", new RediscoverTick(tick.Generation, tick.Attempt + 1, signature), _rediscoverInterval);
|
||||||
else
|
else
|
||||||
_log.Debug("DriverInstance {Id}: discovery settled after {Attempt} pass(es), {Count} node(s)", _driverInstanceId, tick.Attempt + 1, nodes.Count);
|
_log.Debug("DriverInstance {Id}: discovery settled after {Attempt} pass(es), {Count} node(s)", _driverInstanceId, tick.Attempt + 1, nodes.Count);
|
||||||
}
|
}
|
||||||
|
|||||||
+112
@@ -118,6 +118,59 @@ public sealed class DriverInstanceActorDiscoveryTests : RuntimeActorTestBase
|
|||||||
driver.DiscoverCount.ShouldBeGreaterThan(passesBeforeReconnect);
|
driver.DiscoverCount.ShouldBeGreaterThan(passesBeforeReconnect);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Regression for the Critical: a driver whose <c>DiscoverAsync</c> completes ASYNCHRONOUSLY (off the
|
||||||
|
/// actor thread) must still ship <see cref="DriverInstanceActor.DiscoveredNodesReady"/>. The handler
|
||||||
|
/// touches <c>Context.Parent</c> + <c>Timers</c> AFTER awaiting discovery; if it awaited with
|
||||||
|
/// <c>ConfigureAwait(false)</c> the continuation would resume off the actor context and those calls
|
||||||
|
/// would throw <c>NotSupportedException("no active ActorContext")</c> — the handler would fault and no
|
||||||
|
/// message would arrive. Synchronous (<c>Task.CompletedTask</c>) stubs mask the bug; this one forces a
|
||||||
|
/// genuine off-context resume (modelled on <c>SubscribableStubDriver.UnsubscribeYields</c>).
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public void Async_completing_discovery_resumes_on_actor_context_and_publishes()
|
||||||
|
{
|
||||||
|
var driver = new YieldingDiscoverableStubDriver();
|
||||||
|
var parent = CreateTestProbe();
|
||||||
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(
|
||||||
|
driver, rediscoverInterval: TimeSpan.FromMilliseconds(20)));
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
||||||
|
|
||||||
|
// With the fix the handler resumes on the actor context, so the publish succeeds and the parent gets
|
||||||
|
// a non-empty set. Without it the handler faults at Context.Parent.Tell and this times out.
|
||||||
|
var published = parent.ExpectMsg<DriverInstanceActor.DiscoveredNodesReady>(TimeSpan.FromSeconds(2));
|
||||||
|
published.Nodes.Count.ShouldBe(3);
|
||||||
|
published.DriverInstanceId.ShouldBe(driver.DriverInstanceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The attempt cap bounds a discovered set that never stabilises: a driver whose set keeps GROWING
|
||||||
|
/// (1,2,3,…) never repeats its signature, so the loop is stopped only by
|
||||||
|
/// <c>rediscoverMaxAttempts</c>. With a cap of 3, exactly 3 passes are published, then the stream stops.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public void Never_stabilising_discovery_is_bounded_by_the_attempt_cap()
|
||||||
|
{
|
||||||
|
var driver = new GrowingDiscoverableStubDriver();
|
||||||
|
var parent = CreateTestProbe();
|
||||||
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(
|
||||||
|
driver, rediscoverInterval: TimeSpan.FromMilliseconds(20), rediscoverMaxAttempts: 3));
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
||||||
|
|
||||||
|
var msgs = new List<DriverInstanceActor.DiscoveredNodesReady>();
|
||||||
|
for (var i = 0; i < 3; i++)
|
||||||
|
msgs.Add(parent.ExpectMsg<DriverInstanceActor.DiscoveredNodesReady>(TimeSpan.FromSeconds(2)));
|
||||||
|
|
||||||
|
// Cap reached — no fourth pass even though the set never stabilised.
|
||||||
|
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||||
|
|
||||||
|
// The set genuinely kept growing across the capped passes (1,2,3 nodes).
|
||||||
|
msgs.Select(m => m.Nodes.Count).ShouldBe(new[] { 1, 2, 3 });
|
||||||
|
driver.DiscoverCount.ShouldBe(3);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// A <see cref="StubDriver"/> that also exposes <see cref="ITagDiscovery"/>. Each <c>DiscoverAsync</c>
|
/// A <see cref="StubDriver"/> that also exposes <see cref="ITagDiscovery"/>. Each <c>DiscoverAsync</c>
|
||||||
/// pass is counted; passes 1–2 yield nothing (cache warming), passes 3+ yield a stable 3-node set —
|
/// pass is counted; passes 1–2 yield nothing (cache warming), passes 3+ yield a stable 3-node set —
|
||||||
@@ -149,4 +202,63 @@ public sealed class DriverInstanceActorDiscoveryTests : RuntimeActorTestBase
|
|||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// A discoverable driver whose <c>DiscoverAsync</c> genuinely SUSPENDS and resumes on a fresh
|
||||||
|
/// thread-pool thread that carries NO Akka actor cell — modelled on
|
||||||
|
/// <c>SubscribableStubDriver.UnsubscribeYields</c>. This forces the actor's <c>await DiscoverAsync(...)</c>
|
||||||
|
/// continuation to resume off-context unless the handler omits <c>ConfigureAwait(false)</c>, so it is a
|
||||||
|
/// deterministic repro of the no-ActorContext race. Returns a stable 3-node set on every pass.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class YieldingDiscoverableStubDriver : StubDriver, ITagDiscovery
|
||||||
|
{
|
||||||
|
/// <summary>Suspends on a TCS completed from a background thread, then streams 3 nodes.</summary>
|
||||||
|
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
|
_ = Task.Run(() => tcs.SetResult(), cancellationToken);
|
||||||
|
await tcs.Task.ConfigureAwait(false); // resume on a clean thread-pool thread (no actor cell)
|
||||||
|
var fixedTree = builder.Folder("FixedTree", "FixedTree");
|
||||||
|
for (var i = 0; i < 3; i++)
|
||||||
|
{
|
||||||
|
fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo(
|
||||||
|
FullName: $"m.fixed.v{i}",
|
||||||
|
DriverDataType: DriverDataType.Float64,
|
||||||
|
IsArray: false,
|
||||||
|
ArrayDim: null,
|
||||||
|
SecurityClass: SecurityClassification.ViewOnly,
|
||||||
|
IsHistorized: false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// A discoverable driver whose set NEVER stabilises: pass N yields N nodes (1,2,3,…), so the
|
||||||
|
/// full-reference signature differs every pass and the loop can only be bounded by the attempt cap.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class GrowingDiscoverableStubDriver : StubDriver, ITagDiscovery
|
||||||
|
{
|
||||||
|
private int _passCount;
|
||||||
|
|
||||||
|
/// <summary>Number of <see cref="DiscoverAsync"/> passes the actor has driven.</summary>
|
||||||
|
public int DiscoverCount => Volatile.Read(ref _passCount);
|
||||||
|
|
||||||
|
/// <summary>Streams an ever-growing node set (pass N → N nodes).</summary>
|
||||||
|
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var pass = Interlocked.Increment(ref _passCount); // 1-based pass number
|
||||||
|
var fixedTree = builder.Folder("FixedTree", "FixedTree");
|
||||||
|
for (var i = 0; i < pass; i++)
|
||||||
|
{
|
||||||
|
fixedTree.Variable($"v{i}", $"v{i}", new DriverAttributeInfo(
|
||||||
|
FullName: $"m.fixed.v{i}",
|
||||||
|
DriverDataType: DriverDataType.Float64,
|
||||||
|
IsArray: false,
|
||||||
|
ArrayDim: null,
|
||||||
|
SecurityClass: SecurityClassification.ViewOnly,
|
||||||
|
IsHistorized: false));
|
||||||
|
}
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user