7b0b9c7365
Solution + 23 src projects + 26 test projects renamed; folders, csproj, namespaces, and ScadaLinkDbContext/ScadaBridgeDbContext class updated. ActorSystem "scadalink" → "scadabridge", Akka seed-node URLs migrated. SQL roles/logins, LDAP domains, CLI command name, and CLI config dir (~/.scadalink → ~/.scadabridge) also renamed. Build green; 5 Host.Tests fail awaiting SQL login rename in next commit. Pre-existing StaleTagMonitor timing flakes unchanged. Rename script committed at tools/rename-to-scadabridge.sh.
1325 lines
66 KiB
C#
1325 lines
66 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit.Xunit2;
|
|
using NSubstitute;
|
|
using NSubstitute.Core;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
|
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Actors;
|
|
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
|
|
|
|
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests;
|
|
|
|
/// <summary>
|
|
/// WP-6: Tests for DataConnectionActor Become/Stash state machine.
|
|
/// WP-9: Auto-reconnect and bad quality tests.
|
|
/// WP-10: Transparent re-subscribe tests.
|
|
/// WP-11: Write-back support tests.
|
|
/// WP-12: Tag path resolution with retry tests.
|
|
/// WP-13: Health reporting tests.
|
|
/// WP-14: Subscription lifecycle tests.
|
|
/// Task-4: Failover state machine tests.
|
|
/// </summary>
|
|
public class DataConnectionActorTests : TestKit
|
|
{
|
|
private readonly IDataConnection _mockAdapter;
|
|
private readonly DataConnectionOptions _options;
|
|
private readonly ISiteHealthCollector _mockHealthCollector;
|
|
private readonly IDataConnectionFactory _mockFactory;
|
|
|
|
public DataConnectionActorTests()
|
|
: base(@"akka.loglevel = DEBUG")
|
|
{
|
|
_mockAdapter = Substitute.For<IDataConnection>();
|
|
_mockHealthCollector = Substitute.For<ISiteHealthCollector>();
|
|
_mockFactory = Substitute.For<IDataConnectionFactory>();
|
|
_options = new DataConnectionOptions
|
|
{
|
|
ReconnectInterval = TimeSpan.FromMilliseconds(100),
|
|
TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200),
|
|
WriteTimeout = TimeSpan.FromSeconds(5)
|
|
};
|
|
}
|
|
|
|
private IActorRef CreateConnectionActor(string name = "test-conn")
|
|
{
|
|
return Sys.ActorOf(Props.Create(() =>
|
|
new DataConnectionActor(name, _mockAdapter, _options, _mockHealthCollector,
|
|
_mockFactory, "OpcUa")), name);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a DataConnectionActor with primary/backup failover configuration.
|
|
/// </summary>
|
|
private IActorRef CreateFailoverActor(
|
|
IDataConnection adapter,
|
|
string name,
|
|
IDictionary<string, string> primaryConfig,
|
|
IDictionary<string, string>? backupConfig,
|
|
int failoverRetryCount)
|
|
{
|
|
return Sys.ActorOf(Props.Create(() =>
|
|
new DataConnectionActor(
|
|
name, adapter, _options, _mockHealthCollector, _mockFactory, "OpcUa",
|
|
primaryConfig, backupConfig, failoverRetryCount)), name);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Raises the Disconnected event on a NSubstitute mock IDataConnection.
|
|
/// </summary>
|
|
private static void RaiseDisconnected(IDataConnection adapter)
|
|
{
|
|
adapter.Disconnected += Raise.Event<Action>();
|
|
}
|
|
|
|
[Fact]
|
|
public void WP6_StartsInConnectingState_AttemptsConnect()
|
|
{
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var actor = CreateConnectionActor();
|
|
|
|
// Give it time to attempt connection
|
|
AwaitCondition(() =>
|
|
_mockAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "ConnectAsync"),
|
|
TimeSpan.FromSeconds(2));
|
|
}
|
|
|
|
[Fact]
|
|
public void WP6_ConnectingState_StashesSubscribeRequests()
|
|
{
|
|
// Make connect hang so we stay in Connecting
|
|
var tcs = new TaskCompletionSource();
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(tcs.Task);
|
|
|
|
var actor = CreateConnectionActor("stash-test");
|
|
|
|
// Send subscribe while connecting — should be stashed
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"corr1", "inst1", "stash-test", ["tag1"], DateTimeOffset.UtcNow));
|
|
|
|
// No response yet (stashed)
|
|
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
|
|
|
// Complete connection — should unstash and process
|
|
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns("sub-001");
|
|
|
|
tcs.SetResult();
|
|
|
|
// Now we should get the response
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(2));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WP11_ConnectedState_Write_ReturnsResult()
|
|
{
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.WriteAsync("tag1", 42, Arg.Any<CancellationToken>())
|
|
.Returns(new WriteResult(true, null));
|
|
|
|
var actor = CreateConnectionActor("write-test");
|
|
|
|
// Wait for connected state
|
|
AwaitCondition(() =>
|
|
_mockAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "ConnectAsync"),
|
|
TimeSpan.FromSeconds(2));
|
|
|
|
// Small delay for state transition
|
|
await Task.Delay(200);
|
|
|
|
actor.Tell(new WriteTagRequest("corr1", "write-test", "tag1", 42, DateTimeOffset.UtcNow));
|
|
|
|
var response = ExpectMsg<WriteTagResponse>(TimeSpan.FromSeconds(3));
|
|
Assert.True(response.Success);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WP11_Write_Failure_ReturnedSynchronously()
|
|
{
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.WriteAsync("tag1", 42, Arg.Any<CancellationToken>())
|
|
.Returns(new WriteResult(false, "Device offline"));
|
|
|
|
var actor = CreateConnectionActor("write-fail-test");
|
|
|
|
await Task.Delay(300);
|
|
|
|
actor.Tell(new WriteTagRequest("corr1", "write-fail-test", "tag1", 42, DateTimeOffset.UtcNow));
|
|
|
|
var response = ExpectMsg<WriteTagResponse>(TimeSpan.FromSeconds(3));
|
|
Assert.False(response.Success);
|
|
Assert.Equal("Device offline", response.ErrorMessage);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WP13_HealthReport_ReturnsConnectionStatus()
|
|
{
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
|
|
var actor = CreateConnectionActor("health-test");
|
|
|
|
await Task.Delay(300);
|
|
|
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
|
|
|
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(2));
|
|
Assert.Equal("health-test", report.ConnectionName);
|
|
Assert.Equal(ConnectionHealth.Connected, report.Status);
|
|
}
|
|
|
|
// ── Task-4: Failover state machine tests ──
|
|
|
|
[Fact]
|
|
public async Task Task4_FailoverAfterNRetries_SwitchesToBackup()
|
|
{
|
|
// Arrange: primary + backup, failoverRetryCount = 2
|
|
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
|
|
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
|
|
var primaryAdapter = Substitute.For<IDataConnection>();
|
|
var backupAdapter = Substitute.For<IDataConnection>();
|
|
|
|
// Initial connect succeeds on primary
|
|
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
// Factory returns backup adapter when called with backup config
|
|
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://backup:4840"))
|
|
.Returns(backupAdapter);
|
|
|
|
// Backup adapter connect succeeds (so failover can complete)
|
|
backupAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var actor = CreateFailoverActor(primaryAdapter, "failover-test", primaryConfig, backupConfig, failoverRetryCount: 2);
|
|
|
|
// Wait for initial connection on primary
|
|
AwaitCondition(() =>
|
|
primaryAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "ConnectAsync"),
|
|
TimeSpan.FromSeconds(2));
|
|
await Task.Delay(200); // State transition to Connected
|
|
|
|
// Now make primary reconnect fail
|
|
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.FromException(new Exception("Connection refused")));
|
|
|
|
// Trigger disconnect
|
|
RaiseDisconnected(primaryAdapter);
|
|
|
|
// Wait for failover: after 2 failures, factory should be called with backup config
|
|
AwaitCondition(() =>
|
|
_mockFactory.ReceivedCalls().Any(c =>
|
|
c.GetMethodInfo().Name == "Create" &&
|
|
c.GetArguments()[1] is IDictionary<string, string> d &&
|
|
d["Endpoint"] == "opc.tcp://backup:4840"),
|
|
TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Task4_SingleEndpoint_RetriesIndefinitely_NoFailover()
|
|
{
|
|
// Arrange: primary only, no backup
|
|
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
|
|
var primaryAdapter = Substitute.For<IDataConnection>();
|
|
var connectCount = 0;
|
|
|
|
// First connect succeeds, all subsequent fail
|
|
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(callInfo =>
|
|
{
|
|
var count = Interlocked.Increment(ref connectCount);
|
|
if (count == 1) return Task.CompletedTask;
|
|
return Task.FromException(new Exception("Connection refused"));
|
|
});
|
|
|
|
var actor = CreateFailoverActor(primaryAdapter, "no-backup-test", primaryConfig, backupConfig: null, failoverRetryCount: 3);
|
|
|
|
// Wait for initial connection
|
|
AwaitCondition(() => connectCount >= 1, TimeSpan.FromSeconds(2));
|
|
await Task.Delay(200);
|
|
|
|
// Trigger disconnect — starts reconnect attempts
|
|
RaiseDisconnected(primaryAdapter);
|
|
|
|
// Wait for many reconnect failures (well over the failoverRetryCount threshold)
|
|
AwaitCondition(() => connectCount >= 8, TimeSpan.FromSeconds(10));
|
|
|
|
// Factory should never be called — no backup to fail over to
|
|
_mockFactory.DidNotReceive().Create(Arg.Any<string>(), Arg.Any<IDictionary<string, string>>());
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Task4_RoundRobin_BackToPrimary_AfterBackupFails()
|
|
{
|
|
// Arrange: primary + backup, failoverRetryCount = 1
|
|
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
|
|
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
|
|
var primaryAdapter = Substitute.For<IDataConnection>();
|
|
var backupAdapter = Substitute.For<IDataConnection>();
|
|
var secondPrimaryAdapter = Substitute.For<IDataConnection>();
|
|
|
|
// Initial connect on primary succeeds
|
|
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
// After disconnect, primary reconnect fails (triggers failover to backup)
|
|
var primaryConnectCount = 0;
|
|
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(callInfo =>
|
|
{
|
|
var count = Interlocked.Increment(ref primaryConnectCount);
|
|
if (count == 1) return Task.CompletedTask; // Initial connect
|
|
return Task.FromException(new Exception("Primary down")); // Reconnect fails
|
|
});
|
|
|
|
// Factory: backup config → backupAdapter, primary config → secondPrimaryAdapter
|
|
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://backup:4840"))
|
|
.Returns(backupAdapter);
|
|
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://primary:4840"))
|
|
.Returns(secondPrimaryAdapter);
|
|
|
|
// Backup connect succeeds first time, then fails on reconnect
|
|
var backupConnectCount = 0;
|
|
backupAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(callInfo =>
|
|
{
|
|
var count = Interlocked.Increment(ref backupConnectCount);
|
|
if (count == 1) return Task.CompletedTask; // First backup connect succeeds
|
|
return Task.FromException(new Exception("Backup down")); // Backup reconnect fails
|
|
});
|
|
|
|
// Second primary adapter connects fine
|
|
secondPrimaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var actor = CreateFailoverActor(primaryAdapter, "roundrobin-test", primaryConfig, backupConfig, failoverRetryCount: 1);
|
|
|
|
// Wait for initial primary connect
|
|
AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2));
|
|
await Task.Delay(200);
|
|
|
|
// Disconnect primary → 1 failure → failover to backup
|
|
RaiseDisconnected(primaryAdapter);
|
|
|
|
// Wait for backup adapter creation
|
|
AwaitCondition(() =>
|
|
_mockFactory.ReceivedCalls().Any(c =>
|
|
c.GetMethodInfo().Name == "Create" &&
|
|
c.GetArguments()[1] is IDictionary<string, string> d &&
|
|
d["Endpoint"] == "opc.tcp://backup:4840"),
|
|
TimeSpan.FromSeconds(5));
|
|
|
|
// Wait for backup to connect successfully
|
|
AwaitCondition(() => backupConnectCount >= 1, TimeSpan.FromSeconds(2));
|
|
await Task.Delay(200);
|
|
|
|
// Now disconnect backup → 1 failure → failover back to primary
|
|
RaiseDisconnected(backupAdapter);
|
|
|
|
// Wait for primary adapter re-creation
|
|
AwaitCondition(() =>
|
|
_mockFactory.ReceivedCalls().Any(c =>
|
|
c.GetMethodInfo().Name == "Create" &&
|
|
c.GetArguments()[1] is IDictionary<string, string> d &&
|
|
d["Endpoint"] == "opc.tcp://primary:4840"),
|
|
TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Task4_SuccessfulReconnect_ResetsFailureCounter()
|
|
{
|
|
// Arrange: primary + backup, failoverRetryCount = 3
|
|
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
|
|
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
|
|
var primaryAdapter = Substitute.For<IDataConnection>();
|
|
var connectCount = 0;
|
|
|
|
// First connect succeeds, then 2 failures, then success, then 2 more failures, then success
|
|
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(callInfo =>
|
|
{
|
|
var count = Interlocked.Increment(ref connectCount);
|
|
// count 1: initial connect → success
|
|
// count 2,3: reconnect failures
|
|
// count 4: reconnect success
|
|
// count 5,6: reconnect failures again
|
|
// count 7: reconnect success again
|
|
return count switch
|
|
{
|
|
1 => Task.CompletedTask,
|
|
2 or 3 => Task.FromException(new Exception("Fail")),
|
|
4 => Task.CompletedTask,
|
|
5 or 6 => Task.FromException(new Exception("Fail")),
|
|
_ => Task.CompletedTask
|
|
};
|
|
});
|
|
|
|
var actor = CreateFailoverActor(primaryAdapter, "reset-counter-test", primaryConfig, backupConfig, failoverRetryCount: 3);
|
|
|
|
// Wait for initial connect
|
|
AwaitCondition(() => connectCount >= 1, TimeSpan.FromSeconds(2));
|
|
await Task.Delay(200);
|
|
|
|
// Disconnect: triggers 1 unstable disconnect + 2 failures then success (count 2,3,4)
|
|
RaiseDisconnected(primaryAdapter);
|
|
|
|
// Wait for successful reconnect (count 4)
|
|
AwaitCondition(() => connectCount >= 4, TimeSpan.FromSeconds(5));
|
|
await Task.Delay(200);
|
|
|
|
// Disconnect again: triggers 2 more failures then success (count 5,6,7)
|
|
RaiseDisconnected(primaryAdapter);
|
|
|
|
// Wait for second successful reconnect (count 7)
|
|
AwaitCondition(() => connectCount >= 7, TimeSpan.FromSeconds(5));
|
|
await Task.Delay(200);
|
|
|
|
// Factory should never be called — connection failures counter resets on each
|
|
// successful reconnect, and unstable disconnect counter is separate
|
|
_mockFactory.DidNotReceive().Create(Arg.Any<string>(), Arg.Any<IDictionary<string, string>>());
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Task4_ReSubscribeAll_CalledAfterFailoverReconnect()
|
|
{
|
|
// Arrange: primary + backup, failoverRetryCount = 1
|
|
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
|
|
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
|
|
var primaryAdapter = Substitute.For<IDataConnection>();
|
|
var backupAdapter = Substitute.For<IDataConnection>();
|
|
|
|
// Primary initial connect succeeds
|
|
var primaryConnectCount = 0;
|
|
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(callInfo =>
|
|
{
|
|
var count = Interlocked.Increment(ref primaryConnectCount);
|
|
if (count == 1) return Task.CompletedTask;
|
|
return Task.FromException(new Exception("Primary down"));
|
|
});
|
|
|
|
// Primary subscribe succeeds
|
|
primaryAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns("sub-primary-001");
|
|
|
|
// Primary read succeeds (for initial read after subscribe)
|
|
primaryAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(true, new TagValue(42.0, QualityCode.Good, DateTimeOffset.UtcNow), null));
|
|
|
|
// Factory returns backup adapter
|
|
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://backup:4840"))
|
|
.Returns(backupAdapter);
|
|
|
|
// Backup connect succeeds
|
|
backupAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
// Backup subscribe succeeds (for re-subscribe after failover)
|
|
backupAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns("sub-backup-001");
|
|
|
|
var actor = CreateFailoverActor(primaryAdapter, "resub-test", primaryConfig, backupConfig, failoverRetryCount: 1);
|
|
|
|
// Wait for initial connect
|
|
AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2));
|
|
await Task.Delay(200);
|
|
|
|
// Subscribe to tags while connected on primary
|
|
actor.Tell(new SubscribeTagsRequest("corr1", "inst1", "resub-test", ["sensor/temp"], DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(3));
|
|
|
|
// Verify primary adapter received subscribe call
|
|
await primaryAdapter.Received().SubscribeAsync(
|
|
"sensor/temp", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>());
|
|
|
|
// Disconnect primary → 1 failure → failover to backup
|
|
RaiseDisconnected(primaryAdapter);
|
|
|
|
// Wait for backup adapter creation and connect
|
|
AwaitCondition(() =>
|
|
_mockFactory.ReceivedCalls().Any(c =>
|
|
c.GetMethodInfo().Name == "Create" &&
|
|
c.GetArguments()[1] is IDictionary<string, string> d &&
|
|
d["Endpoint"] == "opc.tcp://backup:4840"),
|
|
TimeSpan.FromSeconds(5));
|
|
|
|
// Wait for ReSubscribeAll to fire on backup adapter
|
|
AwaitCondition(() =>
|
|
backupAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "SubscribeAsync"),
|
|
TimeSpan.FromSeconds(5));
|
|
|
|
// Verify backup adapter received SubscribeAsync for the same tag
|
|
await backupAdapter.Received().SubscribeAsync(
|
|
"sensor/temp", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>());
|
|
}
|
|
|
|
// ── DataConnectionLayer-001: subscribe must not mutate actor state off-thread ──
|
|
|
|
private static async Task<string> DelayedSubscribeAsync()
|
|
{
|
|
// A short delay so concurrent subscribe background tasks pile up and their
|
|
// post-await state mutations would race under the pre-fix implementation.
|
|
await Task.Delay(1);
|
|
return "sub-" + Guid.NewGuid().ToString("N");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DCL001_ConcurrentSubscribes_DoNotCorruptSubscriptionCounters()
|
|
{
|
|
// Regression test for DataConnectionLayer-001. HandleSubscribe used to mutate
|
|
// actor state (_subscriptionIds, _totalSubscribed, _resolvedTags, the per-instance
|
|
// HashSet) from a Task.Run background thread. Many concurrent subscribes then race
|
|
// on non-thread-safe Dictionary/HashSet and on non-atomic int++ — losing increments
|
|
// or throwing. After the fix every mutation is applied on the actor thread via a
|
|
// SubscribeCompleted message, so the final counts are exact.
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ => DelayedSubscribeAsync());
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl001-concurrent");
|
|
await Task.Delay(300); // reach Connected state
|
|
|
|
const int instances = 30;
|
|
const int tagsPerInstance = 30;
|
|
for (var i = 0; i < instances; i++)
|
|
{
|
|
var tags = Enumerable.Range(0, tagsPerInstance)
|
|
.Select(j => $"inst{i}/tag{j}")
|
|
.ToArray();
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
$"corr{i}", $"inst{i}", "dcl001-concurrent", tags, DateTimeOffset.UtcNow));
|
|
}
|
|
|
|
// Every subscribe must be acknowledged.
|
|
for (var i = 0; i < instances; i++)
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(15));
|
|
|
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
|
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(5));
|
|
|
|
// Every tag is distinct, so each is a fresh, resolved subscription.
|
|
Assert.Equal(instances * tagsPerInstance, report.TotalSubscribedTags);
|
|
Assert.Equal(instances * tagsPerInstance, report.ResolvedTags);
|
|
}
|
|
|
|
// ── DataConnectionLayer-004: subscribe-time failure classification ──
|
|
|
|
[Fact]
|
|
public async Task DCL004_GenuineTagResolutionFailure_PushesBadQualityToSubscriber()
|
|
{
|
|
// Regression test for DataConnectionLayer-004. When a tag genuinely fails to
|
|
// resolve at subscribe time, the design doc (Tag Path Resolution, step 2)
|
|
// requires the attribute to be marked quality `bad`. The pre-fix code only
|
|
// logged and added the tag to _unresolvedTags — the Instance Actor never got
|
|
// a signal. After the fix, a bad-quality TagValueUpdate is pushed.
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
// Genuine node-not-found: a non-connection exception.
|
|
_mockAdapter.SubscribeAsync("missing/tag", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.FromException<string>(new KeyNotFoundException("node not found")));
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl004-bad-quality");
|
|
await Task.Delay(300);
|
|
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c1", "inst1", "dcl004-bad-quality", ["missing/tag"], DateTimeOffset.UtcNow));
|
|
|
|
// Two messages arrive: the subscribe ack and a bad-quality update for the tag.
|
|
var bad = ExpectMsg<TagValueUpdate>(TimeSpan.FromSeconds(5));
|
|
Assert.Equal("missing/tag", bad.TagPath);
|
|
Assert.Equal(QualityCode.Bad, bad.Quality);
|
|
|
|
var ack = ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
Assert.True(ack.Success);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry()
|
|
{
|
|
// Regression test for DataConnectionLayer-004. A subscribe failing because the
|
|
// adapter is not connected (InvalidOperationException from EnsureConnected) is
|
|
// a connection problem, not a bad tag path. The pre-fix code misclassified it
|
|
// as an unresolved tag and retried it on the 10s tag-resolution timer. After
|
|
// the fix it drives the reconnection state machine instead.
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.FromException<string>(
|
|
new InvalidOperationException("OPC UA client is not connected.")));
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl004-conn-level");
|
|
await Task.Delay(300);
|
|
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c1", "inst1", "dcl004-conn-level", ["some/tag"], DateTimeOffset.UtcNow));
|
|
|
|
// The connection-level failure must drive the actor into Reconnecting, which
|
|
// re-attempts ConnectAsync. Pre-fix the actor stayed Connected and only armed
|
|
// the tag-resolution timer, so ConnectAsync is called exactly once.
|
|
AwaitCondition(() =>
|
|
_mockAdapter.ReceivedCalls().Count(c => c.GetMethodInfo().Name == "ConnectAsync") >= 2,
|
|
TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
// ── DataConnectionLayer-005: WriteTimeout must bound a hung write ──
|
|
|
|
[Fact]
|
|
public async Task DCL005_Write_ThatHangs_TimesOutAndReturnsFailureSynchronously()
|
|
{
|
|
// Regression test for DataConnectionLayer-005. HandleWrite called WriteAsync
|
|
// with no CancellationToken and no timeout, so a hung device write never
|
|
// produced a WriteTagResponse. The calling script would block until its own
|
|
// Ask-timeout with no DCL-level error. After the fix, _options.WriteTimeout
|
|
// bounds the write and a timeout is surfaced as a failed WriteTagResponse.
|
|
_options.WriteTimeout = TimeSpan.FromMilliseconds(300);
|
|
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
// WriteAsync never completes unless its cancellation token fires.
|
|
_mockAdapter.WriteAsync("tag1", 42, Arg.Any<CancellationToken>())
|
|
.Returns(ci =>
|
|
{
|
|
var ct = ci.Arg<CancellationToken>();
|
|
var tcs = new TaskCompletionSource<WriteResult>();
|
|
ct.Register(() => tcs.TrySetCanceled(ct));
|
|
return tcs.Task;
|
|
});
|
|
|
|
var actor = CreateConnectionActor("dcl005-write-timeout");
|
|
await Task.Delay(300); // reach Connected state
|
|
|
|
actor.Tell(new WriteTagRequest("corr1", "dcl005-write-timeout", "tag1", 42, DateTimeOffset.UtcNow));
|
|
|
|
var response = ExpectMsg<WriteTagResponse>(TimeSpan.FromSeconds(3));
|
|
Assert.False(response.Success);
|
|
Assert.Contains("timeout", response.ErrorMessage, StringComparison.OrdinalIgnoreCase);
|
|
}
|
|
|
|
// ── DataConnectionLayer-006: quality counters must not drift after unsubscribe/reconnect ──
|
|
|
|
[Fact]
|
|
public async Task DCL006_DisconnectAfterUnsubscribe_BadQualityCountMatchesRemainingTags()
|
|
{
|
|
// Regression test for DataConnectionLayer-006. _lastTagQuality and the three
|
|
// quality counters were never cleaned up on unsubscribe, so a tag removed via
|
|
// HandleUnsubscribe lingered in _lastTagQuality. PushBadQualityForAllTags then
|
|
// set _tagsBadQuality = _lastTagQuality.Count, counting the dropped tag and
|
|
// drifting the bad-quality count above the number of currently subscribed tags.
|
|
var callbacks = new System.Collections.Concurrent.ConcurrentDictionary<string, SubscriptionCallback>();
|
|
var connectCount = 0;
|
|
var reconnectGate = new TaskCompletionSource();
|
|
// First connect succeeds; the reconnect after the disconnect hangs so the actor
|
|
// stays in Reconnecting and ReSubscribeAll does not run before the assertion.
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ => Interlocked.Increment(ref connectCount) == 1
|
|
? Task.CompletedTask
|
|
: reconnectGate.Task);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(ci =>
|
|
{
|
|
callbacks[(string)ci[0]] = (SubscriptionCallback)ci[1];
|
|
return Task.FromResult("sub-" + (string)ci[0]);
|
|
});
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl006-drift");
|
|
await Task.Delay(300);
|
|
|
|
// Two instances, one tag each.
|
|
actor.Tell(new SubscribeTagsRequest("c1", "instA", "dcl006-drift", ["tagA"], DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
actor.Tell(new SubscribeTagsRequest("c2", "instB", "dcl006-drift", ["tagB"], DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
|
|
// Push a Good value for each tag so both land in _lastTagQuality.
|
|
AwaitCondition(() => callbacks.ContainsKey("tagA") && callbacks.ContainsKey("tagB"),
|
|
TimeSpan.FromSeconds(3));
|
|
callbacks["tagA"]("tagA", new TagValue(1, QualityCode.Good, DateTimeOffset.UtcNow));
|
|
callbacks["tagB"]("tagB", new TagValue(2, QualityCode.Good, DateTimeOffset.UtcNow));
|
|
await Task.Delay(200);
|
|
|
|
// Unsubscribe instance B — tagB is no longer subscribed by anyone.
|
|
actor.Tell(new UnsubscribeTagsRequest("c3", "instB", "dcl006-drift", DateTimeOffset.UtcNow));
|
|
await Task.Delay(200);
|
|
|
|
_mockHealthCollector.ClearReceivedCalls();
|
|
|
|
// Disconnect — PushBadQualityForAllTags runs (the reconnect hangs on the gate,
|
|
// so the actor stays in Reconnecting and ReSubscribeAll does not run).
|
|
RaiseDisconnected(_mockAdapter);
|
|
await Task.Delay(300);
|
|
|
|
// PushBadQualityForAllTags must report exactly 1 bad tag (only tagA is still
|
|
// subscribed). Pre-fix tagB lingered in _lastTagQuality and bad was reported as 2.
|
|
var qualityCall = _mockHealthCollector.ReceivedCalls()
|
|
.Where(c => c.GetMethodInfo().Name == "UpdateTagQuality")
|
|
.FirstOrDefault();
|
|
Assert.NotNull(qualityCall);
|
|
var args = qualityCall!.GetArguments();
|
|
var bad = (int)args[2]!;
|
|
Assert.Equal(1, bad);
|
|
|
|
reconnectGate.SetCanceled();
|
|
}
|
|
|
|
// ── DataConnectionLayer-010: tag-resolution retry must not double-dispatch ──
|
|
|
|
[Fact]
|
|
public async Task DCL010_TagResolutionRetry_DoesNotIssueDuplicateConcurrentSubscribes()
|
|
{
|
|
// Regression test for DataConnectionLayer-010. HandleRetryTagResolution fired a
|
|
// SubscribeAsync for every unresolved tag without removing it from _unresolvedTags
|
|
// first. A slow SubscribeAsync overlapping the next retry tick produced duplicate
|
|
// concurrent subscribe attempts for the same tag, leaking the first monitored
|
|
// item / subscription id. After the fix a tag in flight is excluded from the
|
|
// next retry until its attempt completes.
|
|
_options.TagResolutionRetryInterval = TimeSpan.FromMilliseconds(100);
|
|
|
|
var subscribeGate = new TaskCompletionSource<string>();
|
|
var subscribeCalls = 0;
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
_mockAdapter.SubscribeAsync("slow/tag", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(ci =>
|
|
{
|
|
var n = Interlocked.Increment(ref subscribeCalls);
|
|
// First call (initial subscribe) fails genuinely → unresolved.
|
|
if (n == 1) return Task.FromException<string>(new KeyNotFoundException("not found yet"));
|
|
// Subsequent calls are retry attempts — block on the gate so they stay
|
|
// in flight across multiple retry ticks.
|
|
return subscribeGate.Task;
|
|
});
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl010-retry");
|
|
await Task.Delay(300);
|
|
|
|
actor.Tell(new SubscribeTagsRequest("c1", "inst1", "dcl010-retry", ["slow/tag"], DateTimeOffset.UtcNow));
|
|
// Initial subscribe fails → bad-quality push then ack.
|
|
ExpectMsg<TagValueUpdate>(TimeSpan.FromSeconds(5));
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
|
|
// Let several retry ticks (100ms each) elapse while the first retry is blocked.
|
|
await Task.Delay(600);
|
|
|
|
// Exactly one retry attempt should be in flight: 1 initial + 1 retry = 2 total.
|
|
// Pre-fix, every 100ms tick dispatched another → far more than 2.
|
|
Assert.Equal(2, Volatile.Read(ref subscribeCalls));
|
|
|
|
subscribeGate.SetCanceled();
|
|
}
|
|
|
|
// ── DataConnectionLayer-018: concurrent same-tag subscribes must not orphan adapter handles ──
|
|
|
|
[Fact]
|
|
public async Task DCL018_ConcurrentSubscribes_SameTag_DifferentInstances_IssueOneAdapterSubscribe()
|
|
{
|
|
// Regression test for DataConnectionLayer-018. Before the fix, HandleSubscribe
|
|
// snapshotted _subscriptionIds.Keys on the actor thread BEFORE the Task.Run
|
|
// I/O. Two SubscribeTagsRequest messages for different instances sharing a tag
|
|
// would both observe "not subscribed" (the first request's SubscribeCompleted
|
|
// hadn't yet posted), both call _adapter.SubscribeAsync, and the second
|
|
// subscription id would be silently dropped at the
|
|
// _subscriptionIds.ContainsKey guard in HandleSubscribeCompleted — orphaning
|
|
// the adapter's monitored item permanently. With the _subscribesInFlight
|
|
// guard, the second request observes the tag in flight and treats it as
|
|
// AlreadySubscribed without issuing a second adapter call.
|
|
var subscribeStartedFirst = new TaskCompletionSource();
|
|
var releaseFirst = new TaskCompletionSource<string>();
|
|
var subscribeCallCount = 0;
|
|
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
_mockAdapter.SubscribeAsync("shared/tag", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ =>
|
|
{
|
|
var n = Interlocked.Increment(ref subscribeCallCount);
|
|
if (n == 1)
|
|
{
|
|
// Park the first subscribe so the second SubscribeTagsRequest
|
|
// arrives on the actor thread while the first I/O is still in flight.
|
|
subscribeStartedFirst.TrySetResult();
|
|
return releaseFirst.Task;
|
|
}
|
|
// The fix prevents this branch — fail loudly if it ever runs.
|
|
return Task.FromResult("sub-unexpected-" + n);
|
|
});
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl018-shared-tag");
|
|
await Task.Delay(300);
|
|
|
|
// Request 1 — instance A. Park its adapter call in flight.
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c1", "instA", "dcl018-shared-tag", ["shared/tag"], DateTimeOffset.UtcNow));
|
|
await subscribeStartedFirst.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
|
|
// Request 2 — instance B, same tag, sent while instance A's I/O is still in flight.
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c2", "instB", "dcl018-shared-tag", ["shared/tag"], DateTimeOffset.UtcNow));
|
|
|
|
// Instance B's ack must come back before we release instance A — that proves
|
|
// instance B's request did NOT issue its own adapter SubscribeAsync (which is
|
|
// blocked) and instead saw the tag as in-flight.
|
|
SubscribeTagsResponse? bResponse = null;
|
|
for (var i = 0; i < 50 && bResponse is null; i++)
|
|
{
|
|
try { bResponse = ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromMilliseconds(100)); }
|
|
catch { /* keep polling */ }
|
|
if (bResponse?.InstanceUniqueName != "instB") bResponse = null;
|
|
}
|
|
|
|
Assert.NotNull(bResponse);
|
|
Assert.True(bResponse!.Success);
|
|
Assert.Equal(1, Volatile.Read(ref subscribeCallCount));
|
|
|
|
// Release instance A's subscribe so the test cleans up.
|
|
releaseFirst.SetResult("sub-shared");
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
|
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
|
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(5));
|
|
// One tag shared across two instances must count as exactly one subscription.
|
|
// DCL-020 (#28) tracks the related "previously-unresolved tag promoted via
|
|
// another instance" double-count case; here the tag was never unresolved.
|
|
Assert.Equal(1, report.ResolvedTags);
|
|
}
|
|
|
|
// ── DataConnectionLayer-020: previously-unresolved tag, resolved via different instance, must not double-count ──
|
|
|
|
[Fact]
|
|
public async Task DCL020_UnresolvedTagPromoted_ByDifferentInstance_DoesNotDoubleCountTotalSubscribed()
|
|
{
|
|
// Regression test for DataConnectionLayer-020. The first SubscribeTagsRequest
|
|
// (instance A, tag "promote/tag") fails at the resolution layer — the tag is
|
|
// added to _unresolvedTags AND _totalSubscribed is bumped to 1. The second
|
|
// SubscribeTagsRequest (instance B, same tag) succeeds the adapter call. Before
|
|
// the fix, HandleSubscribeCompleted's success branch unconditionally
|
|
// ++_totalSubscribed, taking the total to 2 — even though the logical
|
|
// subscription count is still 1. After the fix the success branch detects
|
|
// the unresolved-tag promotion and increments only _resolvedTags.
|
|
var subscribeCalls = 0;
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
_mockAdapter.SubscribeAsync("promote/tag", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ =>
|
|
{
|
|
var n = Interlocked.Increment(ref subscribeCalls);
|
|
if (n == 1) return Task.FromException<string>(new KeyNotFoundException("not yet"));
|
|
return Task.FromResult("sub-promote-" + n);
|
|
});
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl020-promote");
|
|
await Task.Delay(300);
|
|
|
|
// Instance A — fails at resolution → _unresolvedTags has the tag, _totalSubscribed=1.
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c1", "instA", "dcl020-promote", ["promote/tag"], DateTimeOffset.UtcNow));
|
|
ExpectMsg<TagValueUpdate>(TimeSpan.FromSeconds(5)); // bad-quality push
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
|
|
// Instance B — same tag, but this time the adapter succeeds (n==2 branch).
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c2", "instB", "dcl020-promote", ["promote/tag"], DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
|
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
|
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(5));
|
|
|
|
// Pre-fix: TotalSubscribedTags=2, ResolvedTags=1 (double-count).
|
|
// Post-fix: TotalSubscribedTags=1, ResolvedTags=1 — one logical tag, one resolved.
|
|
Assert.Equal(1, report.TotalSubscribedTags);
|
|
Assert.Equal(1, report.ResolvedTags);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DCL020_TwoInstancesFailingSameTag_OnlyCountsTagOnceInTotal()
|
|
{
|
|
// Regression test for DataConnectionLayer-020's symmetric failure branch.
|
|
// Two instances both fail to resolve the same tag — _unresolvedTags must hold
|
|
// a single entry and _totalSubscribed must be 1, not 2. Pre-fix the failure
|
|
// branch always ran _totalSubscribed++, double-counting on the second
|
|
// instance's failure.
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
_mockAdapter.SubscribeAsync("missing/tag", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.FromException<string>(new KeyNotFoundException("not found")));
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl020-twofail");
|
|
await Task.Delay(300);
|
|
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c1", "instA", "dcl020-twofail", ["missing/tag"], DateTimeOffset.UtcNow));
|
|
ExpectMsg<TagValueUpdate>(TimeSpan.FromSeconds(5));
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c2", "instB", "dcl020-twofail", ["missing/tag"], DateTimeOffset.UtcNow));
|
|
ExpectMsg<TagValueUpdate>(TimeSpan.FromSeconds(5));
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
|
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
|
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(5));
|
|
|
|
Assert.Equal(1, report.TotalSubscribedTags);
|
|
Assert.Equal(0, report.ResolvedTags);
|
|
}
|
|
|
|
// ── DataConnectionLayer-021: mid-flight unsubscribe must release adapter handle and drop state ──
|
|
|
|
[Fact]
|
|
public async Task DCL021_UnsubscribeDuringInFlightSubscribe_ReleasesAdapterHandle_AndKeepsStateClean()
|
|
{
|
|
// Regression test for DataConnectionLayer-021. Previously HandleSubscribeCompleted
|
|
// re-created _subscriptionsByInstance[instanceName] when the instance had been
|
|
// unsubscribed while the subscribe I/O was in flight, and then ran the same
|
|
// counter/handle mutations as the happy path. The leak permanently inflated
|
|
// _subscriptionsByInstance, _tagSubscriberCount, and _totalSubscribed (also re-
|
|
// issued by ReSubscribeAll after every reconnect), and orphaned the adapter
|
|
// monitored item. After the fix, the stale completion is logged + dropped, and
|
|
// _adapter.UnsubscribeAsync is fired for each successful subscription id.
|
|
var subscribeStarted = new TaskCompletionSource();
|
|
var releaseSubscribe = new TaskCompletionSource<string>();
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
_mockAdapter.SubscribeAsync("stale/tag", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ =>
|
|
{
|
|
subscribeStarted.TrySetResult();
|
|
return releaseSubscribe.Task;
|
|
});
|
|
_mockAdapter.UnsubscribeAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl021-mid-flight");
|
|
await Task.Delay(300);
|
|
|
|
// Subscribe instance A — block the adapter call so unsubscribe arrives first.
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c1", "instA", "dcl021-mid-flight", ["stale/tag"], DateTimeOffset.UtcNow));
|
|
await subscribeStarted.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
|
|
// Unsubscribe instance A while the subscribe I/O is still parked.
|
|
actor.Tell(new UnsubscribeTagsRequest("unsub-c1", "instA", "dcl021-mid-flight", DateTimeOffset.UtcNow));
|
|
await Task.Delay(100);
|
|
|
|
// Release the subscribe — SubscribeCompleted is now stale.
|
|
releaseSubscribe.SetResult("sub-orphan");
|
|
|
|
// Wait for SubscribeTagsResponse OR a quiescence interval. The fix may skip
|
|
// the response (instance is gone); allow either outcome but require the
|
|
// adapter UnsubscribeAsync call to have fired.
|
|
await Task.Delay(500);
|
|
|
|
await _mockAdapter.Received(1).UnsubscribeAsync(
|
|
Arg.Is<string>(s => s == "sub-orphan"), Arg.Any<CancellationToken>());
|
|
|
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
|
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(5));
|
|
// Total / resolved must reflect the post-unsubscribe state: 0 tags subscribed.
|
|
Assert.Equal(0, report.TotalSubscribedTags);
|
|
Assert.Equal(0, report.ResolvedTags);
|
|
}
|
|
|
|
// ── DataConnectionLayer-022: tag-resolution retry timer must not reset on every failure ──
|
|
|
|
[Fact]
|
|
public async Task DCL022_BurstedFailedSubscribes_DoNotResetRetryTimer()
|
|
{
|
|
// Regression test for DataConnectionLayer-022. Both HandleSubscribeCompleted
|
|
// and HandleTagResolutionFailed previously called Timers.StartPeriodicTimer
|
|
// unconditionally — StartPeriodicTimer with an existing key cancels and
|
|
// replaces the prior timer, so a burst of SubscribeTagsRequests arriving
|
|
// faster than TagResolutionRetryInterval would re-arm the timer every time
|
|
// and starve the retry indefinitely. After the fix, IsTimerActive gates
|
|
// the StartPeriodicTimer call so the first failure starts the timer and
|
|
// subsequent failures just append to _unresolvedTags.
|
|
_options.TagResolutionRetryInterval = TimeSpan.FromMilliseconds(300);
|
|
|
|
var subscribeCalls = 0;
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
// All subscribes fail at the resolution layer (KeyNotFoundException is a
|
|
// non-connection failure → marks the tag unresolved and starts the timer).
|
|
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ =>
|
|
{
|
|
Interlocked.Increment(ref subscribeCalls);
|
|
return Task.FromException<string>(new KeyNotFoundException("not found"));
|
|
});
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl022-retry-gate");
|
|
await Task.Delay(300); // reach Connected
|
|
|
|
// Fire 5 SubscribeTagsRequests with distinct tags, each ~50ms apart so the
|
|
// total burst (~250ms) is well under the 300ms retry interval. Pre-fix,
|
|
// every failure called StartPeriodicTimer — the 5th call would cancel the
|
|
// running timer and re-arm a fresh 300ms wait, pushing the first retry to
|
|
// ~550ms after the first failure. Post-fix: timer starts once, fires at
|
|
// ~300ms after the first failure regardless of subsequent failures.
|
|
var t0 = DateTimeOffset.UtcNow;
|
|
for (var i = 0; i < 5; i++)
|
|
{
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
$"c{i}", $"inst{i}", "dcl022-retry-gate", [$"burst/tag{i}"], DateTimeOffset.UtcNow));
|
|
await Task.Delay(50);
|
|
}
|
|
|
|
// Drain the 5 ack messages and the 5 bad-quality TagValueUpdates so we
|
|
// don't accidentally compare them against the retry-induced subscribe count.
|
|
for (var i = 0; i < 5; i++)
|
|
{
|
|
// Order: bad-quality TagValueUpdate fires first, then SubscribeTagsResponse.
|
|
ExpectMsg<TagValueUpdate>(TimeSpan.FromSeconds(5));
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
var initialFailureCalls = Volatile.Read(ref subscribeCalls);
|
|
Assert.Equal(5, initialFailureCalls);
|
|
|
|
// Wait one retry interval past the first failure. If the timer was reset by
|
|
// each subsequent failure, no retry has fired yet and subscribeCalls is
|
|
// still 5. With the gate, the timer has fired and re-attempted every
|
|
// unresolved tag (5 more calls).
|
|
var firstFailureToNow = DateTimeOffset.UtcNow - t0;
|
|
var waitForRetryFire = TimeSpan.FromMilliseconds(450) - firstFailureToNow;
|
|
if (waitForRetryFire > TimeSpan.Zero)
|
|
{
|
|
await Task.Delay(waitForRetryFire);
|
|
}
|
|
|
|
var afterFirstInterval = Volatile.Read(ref subscribeCalls);
|
|
Assert.True(afterFirstInterval > initialFailureCalls,
|
|
$"Retry timer should have fired within ~300ms of the first failure, " +
|
|
$"but subscribeCalls stayed at {afterFirstInterval} (initial: {initialFailureCalls}). " +
|
|
$"This is the DCL-022 reset-on-every-call starvation regression.");
|
|
}
|
|
|
|
// ── DataConnectionLayer-011: stale callbacks from a disposed adapter must be dropped ──
|
|
|
|
[Fact]
|
|
public async Task DCL011_StaleTagValueFromOldAdapter_IsNotForwardedAfterFailover()
|
|
{
|
|
// Regression test for DataConnectionLayer-011. On failover the old adapter is
|
|
// disposed and a fresh one created, but the old adapter's subscription callbacks
|
|
// captured Self and keep Telling TagValueReceived. With no per-adapter generation
|
|
// tag, a value from the disposed adapter delivered after the actor is Connected
|
|
// on the new endpoint would be forwarded to the Instance Actor, mixing
|
|
// pre-failover device data with the active endpoint's data.
|
|
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
|
|
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
|
|
var primaryAdapter = Substitute.For<IDataConnection>();
|
|
var backupAdapter = Substitute.For<IDataConnection>();
|
|
|
|
SubscriptionCallback? primaryCallback = null;
|
|
|
|
var primaryConnectCount = 0;
|
|
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ => Interlocked.Increment(ref primaryConnectCount) == 1
|
|
? Task.CompletedTask
|
|
: Task.FromException(new Exception("Primary down")));
|
|
primaryAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
primaryAdapter.SubscribeAsync("sensor/temp", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(ci =>
|
|
{
|
|
primaryCallback = (SubscriptionCallback)ci[1];
|
|
return Task.FromResult("sub-primary");
|
|
});
|
|
primaryAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://backup:4840"))
|
|
.Returns(backupAdapter);
|
|
backupAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
backupAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
backupAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns("sub-backup");
|
|
backupAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateFailoverActor(primaryAdapter, "dcl011-stale", primaryConfig, backupConfig, failoverRetryCount: 1);
|
|
|
|
AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2));
|
|
await Task.Delay(200);
|
|
|
|
actor.Tell(new SubscribeTagsRequest("c1", TestActor.Path.Name, "dcl011-stale", ["sensor/temp"], DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(3));
|
|
AwaitCondition(() => primaryCallback != null, TimeSpan.FromSeconds(3));
|
|
|
|
// Fail over to backup.
|
|
RaiseDisconnected(primaryAdapter);
|
|
|
|
// The disconnect pushes a bad-quality ConnectionQualityChanged to the subscriber.
|
|
ExpectMsg<ConnectionQualityChanged>(TimeSpan.FromSeconds(3));
|
|
|
|
AwaitCondition(() =>
|
|
backupAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "SubscribeAsync"),
|
|
TimeSpan.FromSeconds(5));
|
|
await Task.Delay(300); // actor is Connected on backup
|
|
|
|
// Drain any value updates produced by the re-subscribe path.
|
|
ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
|
|
|
// The disposed primary adapter's callback fires a stale value.
|
|
primaryCallback!("sensor/temp", new TagValue(999, QualityCode.Good, DateTimeOffset.UtcNow));
|
|
|
|
// That stale value must NOT reach the subscriber.
|
|
ExpectNoMsg(TimeSpan.FromSeconds(1));
|
|
}
|
|
|
|
// ── DataConnectionLayer-015: initial-connect failures must trigger failover ──
|
|
|
|
[Fact]
|
|
public async Task DCL015_PrimaryDownAtStartup_FailsOverToBackup()
|
|
{
|
|
// Regression test for DataConnectionLayer-015. HandleConnectResult — the handler
|
|
// for the INITIAL connection attempt in the Connecting state — only logged and
|
|
// re-armed the reconnect timer. It never incremented _consecutiveFailures and
|
|
// never switched endpoint, so a primary that is unreachable when the actor first
|
|
// starts (a fresh deployment, a site restart, a primary simply down) retried the
|
|
// primary forever and never tried the configured backup. After the fix the
|
|
// initial connect participates in the failover counter like HandleReconnectResult.
|
|
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
|
|
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
|
|
var primaryAdapter = Substitute.For<IDataConnection>();
|
|
var backupAdapter = Substitute.For<IDataConnection>();
|
|
|
|
// Primary is down from the very first attempt — it never connects.
|
|
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.FromException(new Exception("Connection refused")));
|
|
|
|
// Factory returns the backup adapter when called with the backup config.
|
|
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://backup:4840"))
|
|
.Returns(backupAdapter);
|
|
backupAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var actor = CreateFailoverActor(primaryAdapter, "dcl015-startup-failover",
|
|
primaryConfig, backupConfig, failoverRetryCount: 2);
|
|
|
|
// After failoverRetryCount initial-connect failures on the primary, the actor
|
|
// must build the backup adapter. Pre-fix the factory was never called.
|
|
AwaitCondition(() =>
|
|
_mockFactory.ReceivedCalls().Any(c =>
|
|
c.GetMethodInfo().Name == "Create" &&
|
|
c.GetArguments()[1] is IDictionary<string, string> d &&
|
|
d["Endpoint"] == "opc.tcp://backup:4840"),
|
|
TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DCL015_SingleEndpointDownAtStartup_RetriesIndefinitely_NoFailover()
|
|
{
|
|
// Companion guard: a single-endpoint connection (no backup) whose primary is
|
|
// unreachable at startup must keep retrying the same endpoint indefinitely — the
|
|
// initial-connect failover counter must not synthesise a non-existent backup.
|
|
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
|
|
var primaryAdapter = Substitute.For<IDataConnection>();
|
|
var connectCount = 0;
|
|
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ =>
|
|
{
|
|
Interlocked.Increment(ref connectCount);
|
|
return Task.FromException(new Exception("Connection refused"));
|
|
});
|
|
|
|
var actor = CreateFailoverActor(primaryAdapter, "dcl015-no-backup",
|
|
primaryConfig, backupConfig: null, failoverRetryCount: 2);
|
|
|
|
// Many retries occur (well past the failover threshold) but no adapter is ever
|
|
// created via the factory — there is nothing to fail over to.
|
|
AwaitCondition(() => connectCount >= 6, TimeSpan.FromSeconds(10));
|
|
_mockFactory.DidNotReceive().Create(Arg.Any<string>(), Arg.Any<IDictionary<string, string>>());
|
|
}
|
|
|
|
// ── DataConnectionLayer-016: subscribe response must reflect a connection-level failure ──
|
|
|
|
[Fact]
|
|
public async Task DCL016_ConnectionLevelSubscribeFailure_RepliesWithUnsuccessfulResponse()
|
|
{
|
|
// Regression test for DataConnectionLayer-016. When a subscribe arrives while the
|
|
// adapter is silently down, HandleSubscribeCompleted drove the actor into
|
|
// Reconnecting (a connection-level failure) but still replied to the caller with
|
|
// SubscribeTagsResponse(Success: true, Error: null). The Instance Actor was told
|
|
// the subscribe succeeded while the tags were never actually subscribed at the
|
|
// adapter. After the fix the response matches the actor's own assessment:
|
|
// Success: false with an explanatory error.
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
// Subscribe fails at connection level (InvalidOperationException from EnsureConnected).
|
|
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.FromException<string>(
|
|
new InvalidOperationException("OPC UA client is not connected.")));
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl016-conn-fail");
|
|
await Task.Delay(300);
|
|
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c1", "inst1", "dcl016-conn-fail", ["some/tag"], DateTimeOffset.UtcNow));
|
|
|
|
// The response must NOT claim success — the connection-level failure that drove
|
|
// Reconnecting means the tags were never subscribed at the adapter.
|
|
var response = ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
Assert.False(response.Success);
|
|
Assert.NotNull(response.ErrorMessage);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DCL016_GenuineResolutionFailure_StillRepliesSuccess()
|
|
{
|
|
// Companion guard: a genuine tag-resolution failure (the node does not exist) is
|
|
// a runtime quality concern, not a connection-level fault — the design tracks it
|
|
// via _unresolvedTags and a Bad-quality TagValueUpdate. The overall subscribe
|
|
// response stays Success: true so this case is not regressed by the 016 fix.
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
_mockAdapter.SubscribeAsync("missing/tag", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.FromException<string>(new KeyNotFoundException("node not found")));
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl016-genuine");
|
|
await Task.Delay(300);
|
|
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c1", "inst1", "dcl016-genuine", ["missing/tag"], DateTimeOffset.UtcNow));
|
|
|
|
var ack = FishForMessage<SubscribeTagsResponse>(_ => true, TimeSpan.FromSeconds(5));
|
|
Assert.True(ack.Success);
|
|
Assert.Null(ack.ErrorMessage);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DCL001_SubscribeWithFailedTags_CountsResolvedAndUnresolvedSeparately()
|
|
{
|
|
// Behavioural guard: the restructured subscribe must preserve the original
|
|
// accounting — failed tags count toward TotalSubscribed but not ResolvedTags.
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(ci => ((string)ci[0]).StartsWith("bad")
|
|
? Task.FromException<string>(new Exception("tag not found"))
|
|
: Task.FromResult("sub-ok"));
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
|
|
var actor = CreateConnectionActor("dcl001-failed-tags");
|
|
await Task.Delay(300);
|
|
|
|
actor.Tell(new SubscribeTagsRequest(
|
|
"c1", "inst1", "dcl001-failed-tags",
|
|
["good/a", "good/b", "good/c", "bad/x", "bad/y"], DateTimeOffset.UtcNow));
|
|
|
|
// Two genuine resolution failures now also push a bad-quality TagValueUpdate
|
|
// to the subscriber (DataConnectionLayer-004); skip past those to the ack.
|
|
var ack = FishForMessage<SubscribeTagsResponse>(_ => true, TimeSpan.FromSeconds(5));
|
|
Assert.True(ack.Success);
|
|
|
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
|
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
|
|
|
|
Assert.Equal(5, report.TotalSubscribedTags); // all 5 tags tracked
|
|
Assert.Equal(3, report.ResolvedTags); // only the 3 good ones resolved
|
|
}
|
|
|
|
// ── DataConnectionLayer-008: HandleUnsubscribe shared-tag reference counting ──
|
|
|
|
[Fact]
|
|
public async Task DCL008_Unsubscribe_OnlyReleasesTagWhenLastSubscriberLeaves()
|
|
{
|
|
// Regression test for DataConnectionLayer-008. HandleUnsubscribe must release a
|
|
// tag at the adapter only when no other instance still subscribes to it. The
|
|
// O(n) per-tag scan over every instance was replaced with an O(1) reference
|
|
// count; this guards that the reference count tracks shared subscriptions
|
|
// correctly — a shared tag is kept while any subscriber remains and the
|
|
// resolved-tag counter and adapter UnsubscribeAsync stay consistent.
|
|
var unsubscribed = new System.Collections.Concurrent.ConcurrentBag<string>();
|
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
|
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(ci => Task.FromResult("sub-" + (string)ci[0]));
|
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(new ReadResult(false, null, null));
|
|
_mockAdapter.UnsubscribeAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(ci => { unsubscribed.Add((string)ci[0]); return Task.CompletedTask; });
|
|
|
|
var actor = CreateConnectionActor("dcl008-shared");
|
|
await Task.Delay(300);
|
|
|
|
// Two instances both subscribe to the shared tag; instA also has an exclusive tag.
|
|
actor.Tell(new SubscribeTagsRequest("c1", "instA", "dcl008-shared",
|
|
["shared/tag", "exclusive/a"], DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
actor.Tell(new SubscribeTagsRequest("c2", "instB", "dcl008-shared",
|
|
["shared/tag"], DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
|
|
// Unsubscribe instA — shared/tag must stay (instB still subscribes); only
|
|
// exclusive/a is released at the adapter.
|
|
actor.Tell(new UnsubscribeTagsRequest("c3", "instA", "dcl008-shared", DateTimeOffset.UtcNow));
|
|
await Task.Delay(300);
|
|
|
|
Assert.Contains("sub-exclusive/a", unsubscribed);
|
|
Assert.DoesNotContain("sub-shared/tag", unsubscribed);
|
|
|
|
// Health: 1 tag still subscribed and resolved (shared/tag held by instB).
|
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
|
var report1 = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
|
|
Assert.Equal(1, report1.TotalSubscribedTags);
|
|
Assert.Equal(1, report1.ResolvedTags);
|
|
|
|
// Unsubscribe instB — now shared/tag has no subscribers and is released.
|
|
actor.Tell(new UnsubscribeTagsRequest("c4", "instB", "dcl008-shared", DateTimeOffset.UtcNow));
|
|
await Task.Delay(300);
|
|
|
|
Assert.Contains("sub-shared/tag", unsubscribed);
|
|
|
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
|
var report2 = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
|
|
Assert.Equal(0, report2.TotalSubscribedTags);
|
|
Assert.Equal(0, report2.ResolvedTags);
|
|
}
|
|
}
|