Files
scadalink-design/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs

649 lines
29 KiB
C#

using Akka.Actor;
using Akka.TestKit.Xunit2;
using NSubstitute;
using NSubstitute.Core;
using ScadaLink.Commons.Interfaces.Protocol;
using ScadaLink.Commons.Messages.DataConnection;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.DataConnectionLayer.Actors;
using ScadaLink.HealthMonitoring;
namespace ScadaLink.DataConnectionLayer.Tests;
/// <summary>
/// WP-6: Tests for DataConnectionActor Become/Stash state machine.
/// WP-9: Auto-reconnect and bad quality tests.
/// WP-10: Transparent re-subscribe tests.
/// WP-11: Write-back support tests.
/// WP-12: Tag path resolution with retry tests.
/// WP-13: Health reporting tests.
/// WP-14: Subscription lifecycle tests.
/// Task-4: Failover state machine tests.
/// </summary>
public class DataConnectionActorTests : TestKit
{
private readonly IDataConnection _mockAdapter;
private readonly DataConnectionOptions _options;
private readonly ISiteHealthCollector _mockHealthCollector;
private readonly IDataConnectionFactory _mockFactory;
public DataConnectionActorTests()
: base(@"akka.loglevel = DEBUG")
{
_mockAdapter = Substitute.For<IDataConnection>();
_mockHealthCollector = Substitute.For<ISiteHealthCollector>();
_mockFactory = Substitute.For<IDataConnectionFactory>();
_options = new DataConnectionOptions
{
ReconnectInterval = TimeSpan.FromMilliseconds(100),
TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200),
WriteTimeout = TimeSpan.FromSeconds(5)
};
}
private IActorRef CreateConnectionActor(string name = "test-conn")
{
return Sys.ActorOf(Props.Create(() =>
new DataConnectionActor(name, _mockAdapter, _options, _mockHealthCollector,
_mockFactory, "OpcUa")), name);
}
/// <summary>
/// Creates a DataConnectionActor with primary/backup failover configuration.
/// </summary>
private IActorRef CreateFailoverActor(
IDataConnection adapter,
string name,
IDictionary<string, string> primaryConfig,
IDictionary<string, string>? backupConfig,
int failoverRetryCount)
{
return Sys.ActorOf(Props.Create(() =>
new DataConnectionActor(
name, adapter, _options, _mockHealthCollector, _mockFactory, "OpcUa",
primaryConfig, backupConfig, failoverRetryCount)), name);
}
/// <summary>
/// Raises the Disconnected event on a NSubstitute mock IDataConnection.
/// </summary>
private static void RaiseDisconnected(IDataConnection adapter)
{
adapter.Disconnected += Raise.Event<Action>();
}
[Fact]
public void WP6_StartsInConnectingState_AttemptsConnect()
{
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
var actor = CreateConnectionActor();
// Give it time to attempt connection
AwaitCondition(() =>
_mockAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "ConnectAsync"),
TimeSpan.FromSeconds(2));
}
[Fact]
public void WP6_ConnectingState_StashesSubscribeRequests()
{
// Make connect hang so we stay in Connecting
var tcs = new TaskCompletionSource();
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(tcs.Task);
var actor = CreateConnectionActor("stash-test");
// Send subscribe while connecting — should be stashed
actor.Tell(new SubscribeTagsRequest(
"corr1", "inst1", "stash-test", ["tag1"], DateTimeOffset.UtcNow));
// No response yet (stashed)
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
// Complete connection — should unstash and process
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns("sub-001");
tcs.SetResult();
// Now we should get the response
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(2));
}
[Fact]
public async Task WP11_ConnectedState_Write_ReturnsResult()
{
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.WriteAsync("tag1", 42, Arg.Any<CancellationToken>())
.Returns(new WriteResult(true, null));
var actor = CreateConnectionActor("write-test");
// Wait for connected state
AwaitCondition(() =>
_mockAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "ConnectAsync"),
TimeSpan.FromSeconds(2));
// Small delay for state transition
await Task.Delay(200);
actor.Tell(new WriteTagRequest("corr1", "write-test", "tag1", 42, DateTimeOffset.UtcNow));
var response = ExpectMsg<WriteTagResponse>(TimeSpan.FromSeconds(3));
Assert.True(response.Success);
}
[Fact]
public async Task WP11_Write_Failure_ReturnedSynchronously()
{
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.WriteAsync("tag1", 42, Arg.Any<CancellationToken>())
.Returns(new WriteResult(false, "Device offline"));
var actor = CreateConnectionActor("write-fail-test");
await Task.Delay(300);
actor.Tell(new WriteTagRequest("corr1", "write-fail-test", "tag1", 42, DateTimeOffset.UtcNow));
var response = ExpectMsg<WriteTagResponse>(TimeSpan.FromSeconds(3));
Assert.False(response.Success);
Assert.Equal("Device offline", response.ErrorMessage);
}
[Fact]
public async Task WP13_HealthReport_ReturnsConnectionStatus()
{
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
var actor = CreateConnectionActor("health-test");
await Task.Delay(300);
actor.Tell(new DataConnectionActor.GetHealthReport());
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(2));
Assert.Equal("health-test", report.ConnectionName);
Assert.Equal(ConnectionHealth.Connected, report.Status);
}
// ── Task-4: Failover state machine tests ──
[Fact]
public async Task Task4_FailoverAfterNRetries_SwitchesToBackup()
{
// Arrange: primary + backup, failoverRetryCount = 2
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
var primaryAdapter = Substitute.For<IDataConnection>();
var backupAdapter = Substitute.For<IDataConnection>();
// Initial connect succeeds on primary
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
// Factory returns backup adapter when called with backup config
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://backup:4840"))
.Returns(backupAdapter);
// Backup adapter connect succeeds (so failover can complete)
backupAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
var actor = CreateFailoverActor(primaryAdapter, "failover-test", primaryConfig, backupConfig, failoverRetryCount: 2);
// Wait for initial connection on primary
AwaitCondition(() =>
primaryAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "ConnectAsync"),
TimeSpan.FromSeconds(2));
await Task.Delay(200); // State transition to Connected
// Now make primary reconnect fail
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.FromException(new Exception("Connection refused")));
// Trigger disconnect
RaiseDisconnected(primaryAdapter);
// Wait for failover: after 2 failures, factory should be called with backup config
AwaitCondition(() =>
_mockFactory.ReceivedCalls().Any(c =>
c.GetMethodInfo().Name == "Create" &&
c.GetArguments()[1] is IDictionary<string, string> d &&
d["Endpoint"] == "opc.tcp://backup:4840"),
TimeSpan.FromSeconds(5));
}
[Fact]
public async Task Task4_SingleEndpoint_RetriesIndefinitely_NoFailover()
{
// Arrange: primary only, no backup
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
var primaryAdapter = Substitute.For<IDataConnection>();
var connectCount = 0;
// First connect succeeds, all subsequent fail
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(callInfo =>
{
var count = Interlocked.Increment(ref connectCount);
if (count == 1) return Task.CompletedTask;
return Task.FromException(new Exception("Connection refused"));
});
var actor = CreateFailoverActor(primaryAdapter, "no-backup-test", primaryConfig, backupConfig: null, failoverRetryCount: 3);
// Wait for initial connection
AwaitCondition(() => connectCount >= 1, TimeSpan.FromSeconds(2));
await Task.Delay(200);
// Trigger disconnect — starts reconnect attempts
RaiseDisconnected(primaryAdapter);
// Wait for many reconnect failures (well over the failoverRetryCount threshold)
AwaitCondition(() => connectCount >= 8, TimeSpan.FromSeconds(10));
// Factory should never be called — no backup to fail over to
_mockFactory.DidNotReceive().Create(Arg.Any<string>(), Arg.Any<IDictionary<string, string>>());
}
[Fact]
public async Task Task4_RoundRobin_BackToPrimary_AfterBackupFails()
{
// Arrange: primary + backup, failoverRetryCount = 1
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
var primaryAdapter = Substitute.For<IDataConnection>();
var backupAdapter = Substitute.For<IDataConnection>();
var secondPrimaryAdapter = Substitute.For<IDataConnection>();
// Initial connect on primary succeeds
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
// After disconnect, primary reconnect fails (triggers failover to backup)
var primaryConnectCount = 0;
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(callInfo =>
{
var count = Interlocked.Increment(ref primaryConnectCount);
if (count == 1) return Task.CompletedTask; // Initial connect
return Task.FromException(new Exception("Primary down")); // Reconnect fails
});
// Factory: backup config → backupAdapter, primary config → secondPrimaryAdapter
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://backup:4840"))
.Returns(backupAdapter);
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://primary:4840"))
.Returns(secondPrimaryAdapter);
// Backup connect succeeds first time, then fails on reconnect
var backupConnectCount = 0;
backupAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(callInfo =>
{
var count = Interlocked.Increment(ref backupConnectCount);
if (count == 1) return Task.CompletedTask; // First backup connect succeeds
return Task.FromException(new Exception("Backup down")); // Backup reconnect fails
});
// Second primary adapter connects fine
secondPrimaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
var actor = CreateFailoverActor(primaryAdapter, "roundrobin-test", primaryConfig, backupConfig, failoverRetryCount: 1);
// Wait for initial primary connect
AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2));
await Task.Delay(200);
// Disconnect primary → 1 failure → failover to backup
RaiseDisconnected(primaryAdapter);
// Wait for backup adapter creation
AwaitCondition(() =>
_mockFactory.ReceivedCalls().Any(c =>
c.GetMethodInfo().Name == "Create" &&
c.GetArguments()[1] is IDictionary<string, string> d &&
d["Endpoint"] == "opc.tcp://backup:4840"),
TimeSpan.FromSeconds(5));
// Wait for backup to connect successfully
AwaitCondition(() => backupConnectCount >= 1, TimeSpan.FromSeconds(2));
await Task.Delay(200);
// Now disconnect backup → 1 failure → failover back to primary
RaiseDisconnected(backupAdapter);
// Wait for primary adapter re-creation
AwaitCondition(() =>
_mockFactory.ReceivedCalls().Any(c =>
c.GetMethodInfo().Name == "Create" &&
c.GetArguments()[1] is IDictionary<string, string> d &&
d["Endpoint"] == "opc.tcp://primary:4840"),
TimeSpan.FromSeconds(5));
}
[Fact]
public async Task Task4_SuccessfulReconnect_ResetsFailureCounter()
{
// Arrange: primary + backup, failoverRetryCount = 3
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
var primaryAdapter = Substitute.For<IDataConnection>();
var connectCount = 0;
// First connect succeeds, then 2 failures, then success, then 2 more failures, then success
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(callInfo =>
{
var count = Interlocked.Increment(ref connectCount);
// count 1: initial connect → success
// count 2,3: reconnect failures
// count 4: reconnect success
// count 5,6: reconnect failures again
// count 7: reconnect success again
return count switch
{
1 => Task.CompletedTask,
2 or 3 => Task.FromException(new Exception("Fail")),
4 => Task.CompletedTask,
5 or 6 => Task.FromException(new Exception("Fail")),
_ => Task.CompletedTask
};
});
var actor = CreateFailoverActor(primaryAdapter, "reset-counter-test", primaryConfig, backupConfig, failoverRetryCount: 3);
// Wait for initial connect
AwaitCondition(() => connectCount >= 1, TimeSpan.FromSeconds(2));
await Task.Delay(200);
// Disconnect: triggers 1 unstable disconnect + 2 failures then success (count 2,3,4)
RaiseDisconnected(primaryAdapter);
// Wait for successful reconnect (count 4)
AwaitCondition(() => connectCount >= 4, TimeSpan.FromSeconds(5));
await Task.Delay(200);
// Disconnect again: triggers 2 more failures then success (count 5,6,7)
RaiseDisconnected(primaryAdapter);
// Wait for second successful reconnect (count 7)
AwaitCondition(() => connectCount >= 7, TimeSpan.FromSeconds(5));
await Task.Delay(200);
// Factory should never be called — connection failures counter resets on each
// successful reconnect, and unstable disconnect counter is separate
_mockFactory.DidNotReceive().Create(Arg.Any<string>(), Arg.Any<IDictionary<string, string>>());
}
[Fact]
public async Task Task4_ReSubscribeAll_CalledAfterFailoverReconnect()
{
// Arrange: primary + backup, failoverRetryCount = 1
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
var primaryAdapter = Substitute.For<IDataConnection>();
var backupAdapter = Substitute.For<IDataConnection>();
// Primary initial connect succeeds
var primaryConnectCount = 0;
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(callInfo =>
{
var count = Interlocked.Increment(ref primaryConnectCount);
if (count == 1) return Task.CompletedTask;
return Task.FromException(new Exception("Primary down"));
});
// Primary subscribe succeeds
primaryAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns("sub-primary-001");
// Primary read succeeds (for initial read after subscribe)
primaryAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(true, new TagValue(42.0, QualityCode.Good, DateTimeOffset.UtcNow), null));
// Factory returns backup adapter
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://backup:4840"))
.Returns(backupAdapter);
// Backup connect succeeds
backupAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
// Backup subscribe succeeds (for re-subscribe after failover)
backupAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns("sub-backup-001");
var actor = CreateFailoverActor(primaryAdapter, "resub-test", primaryConfig, backupConfig, failoverRetryCount: 1);
// Wait for initial connect
AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2));
await Task.Delay(200);
// Subscribe to tags while connected on primary
actor.Tell(new SubscribeTagsRequest("corr1", "inst1", "resub-test", ["sensor/temp"], DateTimeOffset.UtcNow));
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(3));
// Verify primary adapter received subscribe call
await primaryAdapter.Received().SubscribeAsync(
"sensor/temp", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>());
// Disconnect primary → 1 failure → failover to backup
RaiseDisconnected(primaryAdapter);
// Wait for backup adapter creation and connect
AwaitCondition(() =>
_mockFactory.ReceivedCalls().Any(c =>
c.GetMethodInfo().Name == "Create" &&
c.GetArguments()[1] is IDictionary<string, string> d &&
d["Endpoint"] == "opc.tcp://backup:4840"),
TimeSpan.FromSeconds(5));
// Wait for ReSubscribeAll to fire on backup adapter
AwaitCondition(() =>
backupAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "SubscribeAsync"),
TimeSpan.FromSeconds(5));
// Verify backup adapter received SubscribeAsync for the same tag
await backupAdapter.Received().SubscribeAsync(
"sensor/temp", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>());
}
// ── DataConnectionLayer-001: subscribe must not mutate actor state off-thread ──
private static async Task<string> DelayedSubscribeAsync()
{
// A short delay so concurrent subscribe background tasks pile up and their
// post-await state mutations would race under the pre-fix implementation.
await Task.Delay(1);
return "sub-" + Guid.NewGuid().ToString("N");
}
[Fact]
public async Task DCL001_ConcurrentSubscribes_DoNotCorruptSubscriptionCounters()
{
// Regression test for DataConnectionLayer-001. HandleSubscribe used to mutate
// actor state (_subscriptionIds, _totalSubscribed, _resolvedTags, the per-instance
// HashSet) from a Task.Run background thread. Many concurrent subscribes then race
// on non-thread-safe Dictionary/HashSet and on non-atomic int++ — losing increments
// or throwing. After the fix every mutation is applied on the actor thread via a
// SubscribeCompleted message, so the final counts are exact.
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns(_ => DelayedSubscribeAsync());
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
var actor = CreateConnectionActor("dcl001-concurrent");
await Task.Delay(300); // reach Connected state
const int instances = 30;
const int tagsPerInstance = 30;
for (var i = 0; i < instances; i++)
{
var tags = Enumerable.Range(0, tagsPerInstance)
.Select(j => $"inst{i}/tag{j}")
.ToArray();
actor.Tell(new SubscribeTagsRequest(
$"corr{i}", $"inst{i}", "dcl001-concurrent", tags, DateTimeOffset.UtcNow));
}
// Every subscribe must be acknowledged.
for (var i = 0; i < instances; i++)
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(15));
actor.Tell(new DataConnectionActor.GetHealthReport());
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(5));
// Every tag is distinct, so each is a fresh, resolved subscription.
Assert.Equal(instances * tagsPerInstance, report.TotalSubscribedTags);
Assert.Equal(instances * tagsPerInstance, report.ResolvedTags);
}
// ── DataConnectionLayer-004: subscribe-time failure classification ──
[Fact]
public async Task DCL004_GenuineTagResolutionFailure_PushesBadQualityToSubscriber()
{
// Regression test for DataConnectionLayer-004. When a tag genuinely fails to
// resolve at subscribe time, the design doc (Tag Path Resolution, step 2)
// requires the attribute to be marked quality `bad`. The pre-fix code only
// logged and added the tag to _unresolvedTags — the Instance Actor never got
// a signal. After the fix, a bad-quality TagValueUpdate is pushed.
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
// Genuine node-not-found: a non-connection exception.
_mockAdapter.SubscribeAsync("missing/tag", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns(Task.FromException<string>(new KeyNotFoundException("node not found")));
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
var actor = CreateConnectionActor("dcl004-bad-quality");
await Task.Delay(300);
actor.Tell(new SubscribeTagsRequest(
"c1", "inst1", "dcl004-bad-quality", ["missing/tag"], DateTimeOffset.UtcNow));
// Two messages arrive: the subscribe ack and a bad-quality update for the tag.
var bad = ExpectMsg<TagValueUpdate>(TimeSpan.FromSeconds(5));
Assert.Equal("missing/tag", bad.TagPath);
Assert.Equal(QualityCode.Bad, bad.Quality);
var ack = ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
Assert.True(ack.Success);
}
[Fact]
public async Task DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry()
{
// Regression test for DataConnectionLayer-004. A subscribe failing because the
// adapter is not connected (InvalidOperationException from EnsureConnected) is
// a connection problem, not a bad tag path. The pre-fix code misclassified it
// as an unresolved tag and retried it on the 10s tag-resolution timer. After
// the fix it drives the reconnection state machine instead.
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns(Task.FromException<string>(
new InvalidOperationException("OPC UA client is not connected.")));
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
var actor = CreateConnectionActor("dcl004-conn-level");
await Task.Delay(300);
actor.Tell(new SubscribeTagsRequest(
"c1", "inst1", "dcl004-conn-level", ["some/tag"], DateTimeOffset.UtcNow));
// The connection-level failure must drive the actor into Reconnecting, which
// re-attempts ConnectAsync. Pre-fix the actor stayed Connected and only armed
// the tag-resolution timer, so ConnectAsync is called exactly once.
AwaitCondition(() =>
_mockAdapter.ReceivedCalls().Count(c => c.GetMethodInfo().Name == "ConnectAsync") >= 2,
TimeSpan.FromSeconds(5));
}
// ── DataConnectionLayer-005: WriteTimeout must bound a hung write ──
[Fact]
public async Task DCL005_Write_ThatHangs_TimesOutAndReturnsFailureSynchronously()
{
// Regression test for DataConnectionLayer-005. HandleWrite called WriteAsync
// with no CancellationToken and no timeout, so a hung device write never
// produced a WriteTagResponse. The calling script would block until its own
// Ask-timeout with no DCL-level error. After the fix, _options.WriteTimeout
// bounds the write and a timeout is surfaced as a failed WriteTagResponse.
_options.WriteTimeout = TimeSpan.FromMilliseconds(300);
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
// WriteAsync never completes unless its cancellation token fires.
_mockAdapter.WriteAsync("tag1", 42, Arg.Any<CancellationToken>())
.Returns(ci =>
{
var ct = ci.Arg<CancellationToken>();
var tcs = new TaskCompletionSource<WriteResult>();
ct.Register(() => tcs.TrySetCanceled(ct));
return tcs.Task;
});
var actor = CreateConnectionActor("dcl005-write-timeout");
await Task.Delay(300); // reach Connected state
actor.Tell(new WriteTagRequest("corr1", "dcl005-write-timeout", "tag1", 42, DateTimeOffset.UtcNow));
var response = ExpectMsg<WriteTagResponse>(TimeSpan.FromSeconds(3));
Assert.False(response.Success);
Assert.Contains("timeout", response.ErrorMessage, StringComparison.OrdinalIgnoreCase);
}
[Fact]
public async Task DCL001_SubscribeWithFailedTags_CountsResolvedAndUnresolvedSeparately()
{
// Behavioural guard: the restructured subscribe must preserve the original
// accounting — failed tags count toward TotalSubscribed but not ResolvedTags.
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns(ci => ((string)ci[0]).StartsWith("bad")
? Task.FromException<string>(new Exception("tag not found"))
: Task.FromResult("sub-ok"));
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
var actor = CreateConnectionActor("dcl001-failed-tags");
await Task.Delay(300);
actor.Tell(new SubscribeTagsRequest(
"c1", "inst1", "dcl001-failed-tags",
["good/a", "good/b", "good/c", "bad/x", "bad/y"], DateTimeOffset.UtcNow));
// Two genuine resolution failures now also push a bad-quality TagValueUpdate
// to the subscriber (DataConnectionLayer-004); skip past those to the ack.
var ack = FishForMessage<SubscribeTagsResponse>(_ => true, TimeSpan.FromSeconds(5));
Assert.True(ack.Success);
actor.Tell(new DataConnectionActor.GetHealthReport());
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
Assert.Equal(5, report.TotalSubscribedTags); // all 5 tags tracked
Assert.Equal(3, report.ResolvedTags); // only the 3 good ones resolved
}
}