Files
Joseph Doherty add7210d9e fix(dcl): route native alarm subscribe/unsubscribe through DataConnectionManagerActor
The NativeAlarmActor sends SubscribeAlarmsRequest to the DCL manager, but the
manager only routed tag/write/browse messages to the per-connection
DataConnectionActor — alarm subscribe/unsubscribe were unhandled and dead-lettered,
so native alarms never subscribed at runtime. Caught by live T28 deployment.
Mirrors the existing HandleRoute forwarding.
2026-05-31 03:25:28 -04:00

145 lines
6.4 KiB
C#

using Akka.Actor;
using Akka.TestKit.Xunit2;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Actors;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests;
/// <summary>
/// WP-34: Tests for DataConnectionManagerActor routing and lifecycle.
/// </summary>
public class DataConnectionManagerActorTests : TestKit
{
private readonly IDataConnectionFactory _mockFactory;
private readonly DataConnectionOptions _options;
private readonly ISiteHealthCollector _mockHealthCollector;
public DataConnectionManagerActorTests()
: base(@"akka.loglevel = DEBUG")
{
_mockFactory = Substitute.For<IDataConnectionFactory>();
_mockHealthCollector = Substitute.For<ISiteHealthCollector>();
_options = new DataConnectionOptions
{
ReconnectInterval = TimeSpan.FromMilliseconds(100),
TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200)
};
}
[Fact]
public void WriteToUnknownConnection_ReturnsError()
{
var manager = Sys.ActorOf(Props.Create(() =>
new DataConnectionManagerActor(_mockFactory, _options, _mockHealthCollector)));
manager.Tell(new WriteTagRequest(
"corr1", "nonexistent", "tag1", 42, DateTimeOffset.UtcNow));
var response = ExpectMsg<WriteTagResponse>();
Assert.False(response.Success);
Assert.Contains("Unknown connection", response.ErrorMessage);
}
[Fact]
public void SubscribeToUnknownConnection_ReturnsError()
{
var manager = Sys.ActorOf(Props.Create(() =>
new DataConnectionManagerActor(_mockFactory, _options, _mockHealthCollector)));
manager.Tell(new SubscribeTagsRequest(
"corr1", "inst1", "nonexistent", ["tag1"], DateTimeOffset.UtcNow));
var response = ExpectMsg<SubscribeTagsResponse>();
Assert.False(response.Success);
Assert.Contains("Unknown connection", response.ErrorMessage);
}
[Fact]
public void SubscribeAlarmsToUnknownConnection_ReturnsError()
{
// Regression for the live integration gap: the NativeAlarmActor sends
// SubscribeAlarmsRequest to the DCL manager, which must route it to the
// named connection actor (or reply with an error) — not dead-letter it.
var manager = Sys.ActorOf(Props.Create(() =>
new DataConnectionManagerActor(_mockFactory, _options, _mockHealthCollector)));
manager.Tell(new SubscribeAlarmsRequest(
"corr1", "inst1", "nonexistent", "ns=2;s=Tank01", null, DateTimeOffset.UtcNow));
var response = ExpectMsg<SubscribeAlarmsResponse>();
Assert.False(response.Success);
Assert.Contains("Unknown connection", response.ErrorMessage);
}
[Fact]
public async Task DCL002_ConnectionActorCrash_PreservesSubscriptionState()
{
// Regression test for DataConnectionLayer-002. The supervisor used
// Directive.Restart, which discards the connection actor's in-memory
// subscription registry — breaking the design doc's "transparent
// re-subscribe" guarantee (subscribers are never re-subscribed and sit at
// stale quality forever). After the fix the supervisor uses Resume, which
// keeps the actor instance and its state across a transient exception.
var mockAdapter = Substitute.For<IDataConnection>();
mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
mockAdapter.Status.Returns(ConnectionHealth.Connected);
mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns("sub-001");
mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
// A write throws synchronously, escaping the message handler and crashing
// the connection actor — exercising the supervisor strategy.
mockAdapter.WriteAsync(Arg.Any<string>(), Arg.Any<object?>(), Arg.Any<CancellationToken>())
.Returns<Task<WriteResult>>(_ => throw new InvalidOperationException("boom"));
_mockFactory.Create("OpcUa", Arg.Any<IDictionary<string, string>>()).Returns(mockAdapter);
var manager = Sys.ActorOf(Props.Create(() =>
new DataConnectionManagerActor(_mockFactory, _options, _mockHealthCollector)));
manager.Tell(new CreateConnectionCommand("conn1", "OpcUa", new Dictionary<string, string>(), null, 3));
await Task.Delay(300); // connection actor reaches Connected
// Register a subscription.
manager.Tell(new SubscribeTagsRequest("c1", "inst1", "conn1", ["tag1"], DateTimeOffset.UtcNow));
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(3));
// Crash the connection actor via a synchronously-throwing write.
manager.Tell(new WriteTagRequest("c2", "conn1", "tag1", 42, DateTimeOffset.UtcNow));
await Task.Delay(300); // supervisor handles the failure
// After the crash the subscription state must survive: the health report
// still shows the subscribed/resolved tag. With Restart it would be 0.
manager.Tell(new GetAllHealthReports());
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
Assert.Equal(1, report.TotalSubscribedTags);
Assert.Equal(1, report.ResolvedTags);
}
[Fact]
public void CreateConnection_UsesFactory()
{
var mockAdapter = Substitute.For<IDataConnection>();
mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockFactory.Create("OpcUa", Arg.Any<IDictionary<string, string>>())
.Returns(mockAdapter);
var manager = Sys.ActorOf(Props.Create(() =>
new DataConnectionManagerActor(_mockFactory, _options, _mockHealthCollector)));
manager.Tell(new CreateConnectionCommand(
"conn1", "OpcUa", new Dictionary<string, string>(), null, 3));
// Factory should have been called
AwaitCondition(() =>
_mockFactory.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "Create"),
TimeSpan.FromSeconds(2));
}
}