fix(runtime): fast-fail writes in degraded driver states + swallow self SubscriptionEstablished

This commit is contained in:
Joseph Doherty
2026-06-14 00:34:37 -04:00
parent 99eea0b455
commit 42b4a923fd
2 changed files with 298 additions and 0 deletions
@@ -211,6 +211,11 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
private void Connecting()
{
Receive<InitializeRequested>(msg => InitializeAsync(msg.DriverConfigJson));
// Fast-fail writes while still connecting — without this the inbound WriteAttribute dead-letters
// and DriverHostActor.HandleRouteNodeWrite waits its full 8s Ask before reporting a generic
// "write timeout". Synchronous Receive: Sender.Tell on the actor thread is safe (#4a-instance).
Receive<WriteAttribute>(_ =>
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
Receive<InitializeSucceeded>(_ =>
{
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
@@ -227,6 +232,11 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
});
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
Receive<ForceReconnect>(_ => { /* already connecting — no-op */ });
// ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the
// sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter.
Receive<SubscriptionEstablished>(msg =>
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
}
@@ -259,12 +269,21 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe());
});
Receive<DataChangeForward>(OnDataChangeForward);
// ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the
// sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter.
Receive<SubscriptionEstablished>(msg =>
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
}
private void Reconnecting()
{
Receive<RetryConnect>(_ => InitializeAsync(_currentConfigJson ?? "{}"));
// Fast-fail writes while reconnecting (same reason as Connecting — avoids the 8s host Ask
// timeout on an inbound write to a transiently-down driver). Synchronous Receive (#4a-instance).
Receive<WriteAttribute>(_ =>
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
Receive<InitializeSucceeded>(_ =>
{
Timers.Cancel("retry-connect");
@@ -276,6 +295,11 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
Receive<InitializeFailed>(_ => { /* keep retrying via timer */ });
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
Receive<ForceReconnect>(_ => { /* already reconnecting — no-op */ });
// ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the
// sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter.
Receive<SubscriptionEstablished>(msg =>
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
}
@@ -0,0 +1,274 @@
using Akka.Actor;
using Akka.Event;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
/// <summary>
/// Covers two robustness nits on the generic <see cref="DriverInstanceActor"/>:
/// (1) writes that arrive while the actor is NOT in <c>Connected</c> (Stubbed / Connecting /
/// Reconnecting) must fast-fail with a negative <see cref="DriverInstanceActor.WriteAttributeResult"/>
/// rather than dead-letter and let the host Ask wait its full 8s timeout; and
/// (2) the self-Tell <c>Subscribe</c> issued by <c>ResubscribeDesired</c> on (re)connect must not
/// leave a stray <see cref="DriverInstanceActor.SubscriptionEstablished"/> dead-lettering when its
/// reply lands back at Self.
/// </summary>
public sealed class DriverInstanceActorWriteAndSubscribeTests : RuntimeActorTestBase
{
/// <summary>
/// A write to a Stubbed actor fast-fails synchronously: a Stubbed driver never connects, yet
/// it must answer a <see cref="DriverInstanceActor.WriteAttribute"/> Ask with a negative result
/// well inside the host's 8s timeout instead of dead-lettering.
/// </summary>
[Fact]
public async Task WriteAttribute_to_stubbed_driver_fast_fails()
{
var driver = new WritableStubDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, startStubbed: true));
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(2));
// Stubbed drivers deterministically succeed writes without touching hardware (existing behaviour).
reply.Success.ShouldBeTrue();
}
/// <summary>
/// A write to a Connecting actor (InitializeAsync still in flight, never resolves) fast-fails
/// with <c>Success=false</c> and a non-null reason in well under the 8s host Ask timeout —
/// proving the synchronous fast-fail in <c>Connecting()</c> fires instead of dead-lettering.
/// </summary>
[Fact]
public async Task WriteAttribute_to_connecting_driver_fast_fails()
{
// BlockingInitDriver.InitializeAsync never completes, so the actor stays in Connecting.
var driver = new BlockingInitDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeStarted, TimeSpan.FromSeconds(2));
var sw = System.Diagnostics.Stopwatch.StartNew();
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(2));
sw.Stop();
reply.Success.ShouldBeFalse();
reply.Reason.ShouldNotBeNull();
sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(2));
}
/// <summary>
/// A write to a Reconnecting actor (initial connect failed, retrying) fast-fails with a negative
/// result rather than dead-lettering and hanging the host Ask.
/// </summary>
[Fact]
public async Task WriteAttribute_to_reconnecting_driver_fast_fails()
{
// InitializeShouldThrow drives Connecting → Reconnecting; the slow reconnect interval keeps it
// parked there long enough to take a write.
var driver = new WritableStubDriver { InitializeShouldThrow = true };
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
// The first Initialize attempt throws and pushes the actor into Reconnecting.
AwaitCondition(() => driver.InitializeCount >= 1, TimeSpan.FromSeconds(2));
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(2));
reply.Success.ShouldBeFalse();
reply.Reason.ShouldNotBeNull();
}
/// <summary>
/// Driving a connect + auto-resubscribe cycle (the self-Tell <c>Subscribe</c> from
/// <c>ResubscribeDesired</c> whose <c>SubscriptionEstablished</c> reply lands at Self) must not
/// produce a <see cref="DriverInstanceActor.SubscriptionEstablished"/> dead-letter.
/// </summary>
[Fact]
public async Task Self_resubscribe_does_not_deadletter_SubscriptionEstablished()
{
// The probe IGNORES every DeadLetter except those carrying a SubscriptionEstablished payload,
// so its mailbox only ever holds the message we care about. That makes the assertion precise:
// any swallow miss surfaces as a SubscriptionEstablished DeadLetter, and nothing else
// (health-poll cruft, remoting-terminator letters) can give a false positive.
var deadLetters = CreateTestProbe();
deadLetters.IgnoreMessages(m => m is not AllDeadLetters { Message: DriverInstanceActor.SubscriptionEstablished });
Sys.EventStream.Subscribe(deadLetters.Ref, typeof(AllDeadLetters));
var driver = new SubscribableStubDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
// Desired set arrives first; on connect, ResubscribeDesired self-Tells Subscribe, whose
// SubscriptionEstablished reply lands at Self.
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(
new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(100)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(2));
// Force a reconnect to exercise the resubscribe path a second time.
actor.Tell(new DriverInstanceActor.DisconnectObserved("backend blip"));
AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3));
// BARRIER: the self-resubscribe reply is published asynchronously, so SubscribeCount>=2 alone
// doesn't guarantee the actor has yet PROCESSED the self-sent SubscriptionEstablished (the point
// where it either swallows it or — pre-fix — dead-letters it). Drive a real Subscribe Ask from
// the TEST and await its reply: Akka processes the mailbox in order, so once this round-trips,
// every earlier self-resubscribe reply has already been handled and any dead-letter has already
// been published to the EventStream (Unhandled publishes synchronously). Only THEN is ExpectNoMsg
// race-free.
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(100)),
TimeSpan.FromSeconds(3));
deadLetters.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}
// --- stub drivers (mirrors DriverInstanceActorTests) ------------------------------------------
private class StubDriver : IDriver
{
/// <summary>Gets or sets a value indicating whether initialization should throw.</summary>
public bool InitializeShouldThrow { get; set; }
/// <summary>Gets the number of times initialization was called.</summary>
public int InitializeCount;
/// <summary>Gets the driver instance ID.</summary>
public string DriverInstanceId => "stub-driver-1";
/// <summary>Gets the driver type.</summary>
public string DriverType => "Stub";
/// <summary>Initializes the driver with the specified configuration JSON.</summary>
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref InitializeCount);
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
return Task.CompletedTask;
}
/// <summary>Reinitializes the driver with the specified configuration JSON.</summary>
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) =>
Task.CompletedTask;
/// <summary>Shuts down the driver.</summary>
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// <summary>Gets the health status of the driver.</summary>
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
/// <summary>Gets the memory footprint of the driver.</summary>
public long GetMemoryFootprint() => 0;
/// <summary>Flushes optional caches in the driver.</summary>
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
/// <summary>
/// A standalone <see cref="IDriver"/> (and <see cref="IWritable"/>) whose <c>InitializeAsync</c>
/// never completes, parking the actor in <c>Connecting</c>. Implements the interface directly
/// (not via <see cref="StubDriver"/>) so the never-completing Init is reached through the
/// polymorphic <see cref="IDriver"/> call the actor makes.
/// </summary>
private sealed class BlockingInitDriver : IDriver, IWritable
{
/// <summary>Set true the moment InitializeAsync is entered.</summary>
public volatile bool InitializeStarted;
private readonly TaskCompletionSource _gate = new(TaskCreationOptions.RunContinuationsAsynchronously);
/// <summary>Gets the driver instance ID.</summary>
public string DriverInstanceId => "blocking-init-driver";
/// <summary>Gets the driver type.</summary>
public string DriverType => "Stub";
/// <summary>Never completes — keeps the actor in Connecting.</summary>
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
InitializeStarted = true;
return _gate.Task;
}
/// <summary>Reinitializes the driver with the specified configuration JSON.</summary>
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) =>
Task.CompletedTask;
/// <summary>Shuts down the driver.</summary>
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// <summary>Gets the health status of the driver.</summary>
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
/// <summary>Gets the memory footprint of the driver.</summary>
public long GetMemoryFootprint() => 0;
/// <summary>Flushes optional caches in the driver.</summary>
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// <summary>Writes the specified requests (never reached — actor fast-fails in Connecting).</summary>
public Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
IReadOnlyList<WriteResult> results = writes.Select(_ => new WriteResult(0u)).ToList();
return Task.FromResult(results);
}
}
private sealed class WritableStubDriver : StubDriver, IWritable
{
/// <summary>Gets or sets the next status code to return from write operations.</summary>
public uint NextStatusCode { get; set; } = 0u;
/// <summary>Gets the list of write requests received.</summary>
public List<WriteRequest> Writes { get; } = new();
/// <summary>Writes the specified requests.</summary>
public Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
Writes.AddRange(writes);
IReadOnlyList<WriteResult> results = writes.Select(_ => new WriteResult(NextStatusCode)).ToList();
return Task.FromResult(results);
}
}
private sealed class SubscribableStubDriver : StubDriver, ISubscribable
{
/// <summary>Occurs when data changes.</summary>
public event EventHandler<DataChangeEventArgs>? OnDataChange;
private readonly StubHandle _handle = new();
/// <summary>Gets the number of subscribers to OnDataChange.</summary>
public int OnDataChangeSubscriberCount => OnDataChange?.GetInvocationList().Length ?? 0;
/// <summary>Number of times <see cref="SubscribeAsync"/> was called.</summary>
public int SubscribeCount;
/// <summary>Subscribes to the specified full references.</summary>
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
Interlocked.Increment(ref SubscribeCount);
return Task.FromResult<ISubscriptionHandle>(_handle);
}
/// <summary>Unsubscribes from the specified subscription handle.</summary>
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) =>
Task.CompletedTask;
/// <summary>Fires a data change event with the specified parameters.</summary>
public void FireDataChange(string fullRef, object? value, uint statusCode)
{
var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow);
OnDataChange?.Invoke(this, new DataChangeEventArgs(_handle, fullRef, snapshot));
}
private sealed class StubHandle : ISubscriptionHandle
{
/// <summary>Gets the diagnostic ID of the subscription.</summary>
public string DiagnosticId => "stub-sub";
}
}
}