using Akka.Actor;
using Akka.Event;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
///
/// Covers two robustness nits on the generic :
/// (1) writes that arrive while the actor is NOT in Connected (Stubbed / Connecting /
/// Reconnecting) must fast-fail with a negative
/// rather than dead-letter and let the host Ask wait its full 8s timeout; and
/// (2) the self-Tell Subscribe issued by ResubscribeDesired on (re)connect must not
/// leave a stray dead-lettering when its
/// reply lands back at Self.
///
public sealed class DriverInstanceActorWriteAndSubscribeTests : RuntimeActorTestBase
{
///
/// A write to a Stubbed actor fast-fails synchronously: a Stubbed driver never connects, yet
/// it must answer a Ask with a negative result
/// well inside the host's 8s timeout instead of dead-lettering.
///
[Fact]
public async Task WriteAttribute_to_stubbed_driver_fast_fails()
{
var driver = new WritableStubDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, startStubbed: true));
var reply = await actor.Ask(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(2));
// Stubbed drivers deterministically succeed writes without touching hardware (existing behaviour).
reply.Success.ShouldBeTrue();
}
///
/// A write to a Connecting actor (InitializeAsync still in flight, never resolves) fast-fails
/// with Success=false and a non-null reason in well under the 8s host Ask timeout —
/// proving the synchronous fast-fail in Connecting() fires instead of dead-lettering.
///
[Fact]
public async Task WriteAttribute_to_connecting_driver_fast_fails()
{
// BlockingInitDriver.InitializeAsync never completes, so the actor stays in Connecting.
var driver = new BlockingInitDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeStarted, TimeSpan.FromSeconds(2));
var sw = System.Diagnostics.Stopwatch.StartNew();
var reply = await actor.Ask(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(2));
sw.Stop();
reply.Success.ShouldBeFalse();
reply.Reason.ShouldNotBeNull();
sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(2));
}
///
/// A write to a Reconnecting actor (initial connect failed, retrying) fast-fails with a negative
/// result rather than dead-lettering and hanging the host Ask.
///
[Fact]
public async Task WriteAttribute_to_reconnecting_driver_fast_fails()
{
// InitializeShouldThrow drives Connecting → Reconnecting; the slow reconnect interval keeps it
// parked there long enough to take a write.
var driver = new WritableStubDriver { InitializeShouldThrow = true };
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
// The first Initialize attempt throws and pushes the actor into Reconnecting.
AwaitCondition(() => driver.InitializeCount >= 1, TimeSpan.FromSeconds(2));
var reply = await actor.Ask(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(2));
reply.Success.ShouldBeFalse();
reply.Reason.ShouldNotBeNull();
}
///
/// Driving a connect + auto-resubscribe cycle (the self-Tell Subscribe from
/// ResubscribeDesired whose SubscriptionEstablished reply lands at Self) must not
/// produce a dead-letter.
///
[Fact]
public async Task Self_resubscribe_does_not_deadletter_SubscriptionEstablished()
{
// The probe IGNORES every DeadLetter except those carrying a SubscriptionEstablished payload,
// so its mailbox only ever holds the message we care about. That makes the assertion precise:
// any swallow miss surfaces as a SubscriptionEstablished DeadLetter, and nothing else
// (health-poll cruft, remoting-terminator letters) can give a false positive.
var deadLetters = CreateTestProbe();
deadLetters.IgnoreMessages(m => m is not AllDeadLetters { Message: DriverInstanceActor.SubscriptionEstablished });
Sys.EventStream.Subscribe(deadLetters.Ref, typeof(AllDeadLetters));
var driver = new SubscribableStubDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
// Desired set arrives first; on connect, ResubscribeDesired self-Tells Subscribe, whose
// SubscriptionEstablished reply lands at Self.
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(
new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(100)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(2));
// Force a reconnect to exercise the resubscribe path a second time.
actor.Tell(new DriverInstanceActor.DisconnectObserved("backend blip"));
AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3));
// BARRIER: the self-resubscribe reply is published asynchronously, so SubscribeCount>=2 alone
// doesn't guarantee the actor has yet PROCESSED the self-sent SubscriptionEstablished (the point
// where it either swallows it or — pre-fix — dead-letters it). Drive a real Subscribe Ask from
// the TEST and await its reply: Akka processes the mailbox in order, so once this round-trips,
// every earlier self-resubscribe reply has already been handled and any dead-letter has already
// been published to the EventStream (Unhandled publishes synchronously). Only THEN is ExpectNoMsg
// race-free.
await actor.Ask(
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(100)),
TimeSpan.FromSeconds(3));
deadLetters.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}
///
/// When HandleSubscribeAsync throws (e.g. the driver's SubscribeAsync faults),
/// it replies to the sender. On the
/// self-resubscribe path (ResubscribeDesired self-Tells Subscribe) the sender is
/// Self, so the failure reply lands back at the actor. Without a handler, it dead-letters.
/// This test verifies the symmetric swallow (added alongside the
/// one) prevents the dead-letter.
///
[Fact]
public async Task Self_resubscribe_failure_does_not_deadletter_SubscriptionFailed()
{
// Subscribe to AllDeadLetters but ignore everything EXCEPT those carrying a
// SubscriptionFailed payload — keeps the probe mailbox precise.
var deadLetterProbe = CreateTestProbe();
deadLetterProbe.IgnoreMessages(
m => m is not AllDeadLetters { Message: DriverInstanceActor.SubscriptionFailed });
Sys.EventStream.Subscribe(deadLetterProbe.Ref, typeof(AllDeadLetters));
// Bring the actor to Connected state using a subscribable driver.
var driver = new SubscribableStubDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
// Wait until Connected (subscribe path available).
AwaitCondition(() => driver.InitializeCount >= 1, TimeSpan.FromSeconds(2));
// Tell SubscriptionFailed directly — a faithful stand-in for the self-reply landing on the
// actor on the self-resubscribe path. This exercises exactly the Receive
// handler (or proves its absence via a dead-letter).
actor.Tell(new DriverInstanceActor.SubscriptionFailed("boom"));
// Drive a round-trip to flush the actor mailbox before asserting.
// Subscribe Ask (which the Connected state can answer) as the barrier.
await actor.Ask(
new DriverInstanceActor.Subscribe(new[] { "barrier-ref" }, TimeSpan.FromMilliseconds(100)),
TimeSpan.FromSeconds(3));
deadLetterProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}
// --- stub drivers (mirrors DriverInstanceActorTests) ------------------------------------------
private class StubDriver : IDriver
{
/// Gets or sets a value indicating whether initialization should throw.
public bool InitializeShouldThrow { get; set; }
/// Gets the number of times initialization was called.
public int InitializeCount;
/// Gets the driver instance ID.
public string DriverInstanceId => "stub-driver-1";
/// Gets the driver type.
public string DriverType => "Stub";
/// Initializes the driver with the specified configuration JSON.
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref InitializeCount);
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
return Task.CompletedTask;
}
/// Reinitializes the driver with the specified configuration JSON.
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) =>
Task.CompletedTask;
/// Shuts down the driver.
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// Gets the health status of the driver.
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
/// Gets the memory footprint of the driver.
public long GetMemoryFootprint() => 0;
/// Flushes optional caches in the driver.
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
///
/// A standalone (and ) whose InitializeAsync
/// never completes, parking the actor in Connecting. Implements the interface directly
/// (not via ) so the never-completing Init is reached through the
/// polymorphic call the actor makes.
///
private sealed class BlockingInitDriver : IDriver, IWritable
{
/// Set true the moment InitializeAsync is entered.
public volatile bool InitializeStarted;
private readonly TaskCompletionSource _gate = new(TaskCreationOptions.RunContinuationsAsynchronously);
/// Gets the driver instance ID.
public string DriverInstanceId => "blocking-init-driver";
/// Gets the driver type.
public string DriverType => "Stub";
/// Never completes — keeps the actor in Connecting.
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
InitializeStarted = true;
return _gate.Task;
}
/// Reinitializes the driver with the specified configuration JSON.
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) =>
Task.CompletedTask;
/// Shuts down the driver.
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// Gets the health status of the driver.
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
/// Gets the memory footprint of the driver.
public long GetMemoryFootprint() => 0;
/// Flushes optional caches in the driver.
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// Writes the specified requests (never reached — actor fast-fails in Connecting).
public Task> WriteAsync(
IReadOnlyList writes, CancellationToken cancellationToken)
{
IReadOnlyList results = writes.Select(_ => new WriteResult(0u)).ToList();
return Task.FromResult(results);
}
}
private sealed class WritableStubDriver : StubDriver, IWritable
{
/// Gets or sets the next status code to return from write operations.
public uint NextStatusCode { get; set; } = 0u;
/// Gets the list of write requests received.
public List Writes { get; } = new();
/// Writes the specified requests.
public Task> WriteAsync(
IReadOnlyList writes, CancellationToken cancellationToken)
{
Writes.AddRange(writes);
IReadOnlyList results = writes.Select(_ => new WriteResult(NextStatusCode)).ToList();
return Task.FromResult(results);
}
}
private sealed class SubscribableStubDriver : StubDriver, ISubscribable
{
/// Occurs when data changes.
public event EventHandler? OnDataChange;
private readonly StubHandle _handle = new();
/// Gets the number of subscribers to OnDataChange.
public int OnDataChangeSubscriberCount => OnDataChange?.GetInvocationList().Length ?? 0;
/// Number of times was called.
public int SubscribeCount;
/// Subscribes to the specified full references.
public Task SubscribeAsync(
IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
Interlocked.Increment(ref SubscribeCount);
return Task.FromResult(_handle);
}
/// Unsubscribes from the specified subscription handle.
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) =>
Task.CompletedTask;
/// Fires a data change event with the specified parameters.
public void FireDataChange(string fullRef, object? value, uint statusCode)
{
var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow);
OnDataChange?.Invoke(this, new DataChangeEventArgs(_handle, fullRef, snapshot));
}
private sealed class StubHandle : ISubscriptionHandle
{
/// Gets the diagnostic ID of the subscription.
public string DiagnosticId => "stub-sub";
}
}
}