Files
lmxopcua/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/DriverReconnectE2eTests.cs
T

270 lines
12 KiB
C#

using Akka.Actor;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.DependencyInjection;
using Moq;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.AdminUI.Hubs;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Drivers;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests;
/// <summary>
/// E2E integration coverage for the <c>ReconnectDriver</c> command path through
/// <see cref="IAdminOperationsClient"/>.
///
/// <para>The first two tests verify the message round-trip through the
/// <c>AdminOperationsActor</c> singleton against a non-deployed instance id: the command is
/// accepted, persisted as a <c>ConfigEdit</c> audit row, and the reply carries
/// <c>Ok = true</c> with the matching <c>CorrelationId</c>.</para>
///
/// <para><see cref="Reconnect_DeployedDriver_TransitionsThroughReconnectingBackToHealthy"/>
/// goes the full distance: it deploys a real driver (via the opt-in
/// <see cref="FakeReconnectDriverFactory"/> wired into the harness) so the
/// <c>DriverHostActor</c> spawns a managed <c>DriverInstanceActor</c>, then drives the
/// end-to-end reconnect path —
/// <c>ReconnectDriver → AdminOperationsActor → DriverHostActor.HandleReconnectDriver →
/// DriverInstanceActor.ForceReconnect (FSM) → PublishHealthSnapshot → driver-health DPS topic →
/// DriverStatusSignalRBridge → snapshot store / hub push</c> — and asserts the published health
/// transitions Healthy → Reconnecting → Healthy.</para>
/// </summary>
[Trait("Category", "Integration")]
public sealed class DriverReconnectE2eTests
{
private static CancellationToken Ct => TestContext.Current.CancellationToken;
/// <summary>
/// Verifies that a <see cref="ReconnectDriver"/> message dispatched through
/// <see cref="IAdminOperationsClient.AskAsync{T}"/> returns a
/// <see cref="ReconnectDriverResult"/> with <c>Ok = true</c> and the matching
/// correlation ID, confirming the cluster-singleton round-trip works end-to-end.
///
/// <para>The instance ID used here ("reconnect-e2e-nonexistent") does not correspond
/// to a deployed driver, so no <c>DriverInstanceActor</c> will act on the DPS
/// broadcast — the test is validating the command ingestion and reply path only.</para>
/// </summary>
[Fact]
public async Task Reconnect_RoundTrip_ReturnsOk()
{
await using var harness = await TwoNodeClusterHarness.StartAsync();
await using var scope = harness.NodeA.Services.CreateAsyncScope();
var client = scope.ServiceProvider.GetRequiredService<IAdminOperationsClient>();
var correlationId = Guid.NewGuid();
var msg = new ReconnectDriver(
ClusterId: "cluster-e2e-test",
DriverInstanceId: "reconnect-e2e-nonexistent",
ActorByUserName: "e2e-test-runner",
CorrelationId: correlationId);
var result = await client.AskAsync<ReconnectDriverResult>(msg, Ct);
result.CorrelationId.ShouldBe(correlationId);
result.Ok.ShouldBeTrue($"ReconnectDriver round-trip failed: {result.Message}");
result.Message.ShouldBeNull();
}
/// <summary>
/// Verifies that a second <see cref="ReconnectDriver"/> for the same instance ID
/// is also accepted (idempotent at the actor layer — the actor simply re-broadcasts
/// to DPS and writes another <c>ConfigEdit</c> row).
/// </summary>
[Fact]
public async Task Reconnect_IsIdempotent_SecondCallAlsoReturnsOk()
{
await using var harness = await TwoNodeClusterHarness.StartAsync();
await using var scope = harness.NodeA.Services.CreateAsyncScope();
var client = scope.ServiceProvider.GetRequiredService<IAdminOperationsClient>();
const string instanceId = "reconnect-idempotency-test";
var first = new ReconnectDriver("cluster-1", instanceId, "runner", Guid.NewGuid());
var second = new ReconnectDriver("cluster-1", instanceId, "runner", Guid.NewGuid());
var r1 = await client.AskAsync<ReconnectDriverResult>(first, Ct);
var r2 = await client.AskAsync<ReconnectDriverResult>(second, Ct);
r1.Ok.ShouldBeTrue($"First call failed: {r1.Message}");
r2.Ok.ShouldBeTrue($"Second call failed: {r2.Message}");
r1.CorrelationId.ShouldBe(first.CorrelationId);
r2.CorrelationId.ShouldBe(second.CorrelationId);
}
private const string ClusterId = "RECONNECT-E2E";
private const string DriverId = "drv-modbus";
/// <summary>
/// Full-stack reconnect: deploys a real driver (the in-process
/// <see cref="FakeReconnectDriverFactory"/>), proves it reaches Healthy on the driver-health DPS
/// topic, simulates a lost connection (<see cref="FakeReconnectDriver.ReportReconnecting"/>), issues
/// a <see cref="ReconnectDriver"/> through <see cref="IAdminOperationsClient"/>, and asserts the
/// published health walks Healthy → Reconnecting → Healthy — captured at the
/// <see cref="DriverStatusSignalRBridge"/> hub-push seam. Confirms the operator Reconnect threads
/// the whole cluster path and genuinely re-initialises the driver (<c>InitializeCount ≥ 2</c>).
/// </summary>
[Fact]
public async Task Reconnect_DeployedDriver_TransitionsThroughReconnectingBackToHealthy()
{
var factory = new FakeReconnectDriverFactory();
await using var harness = await TwoNodeClusterHarness.StartAsync(driverFactory: factory);
var store = harness.NodeA.Services.GetRequiredService<IDriverStatusSnapshotStore>();
// Capture every DriverHealthChanged the bridge pushes to the hub (the first SendCoreAsync arg).
var captured = new List<DriverHealthChanged>();
var captureLock = new object();
var mockClients = new Mock<IHubClients>();
var mockClientProxy = new Mock<IClientProxy>();
mockClients.Setup(c => c.Group(It.IsAny<string>())).Returns(mockClientProxy.Object);
mockClientProxy
.Setup(p => p.SendCoreAsync(It.IsAny<string>(), It.IsAny<object?[]>(), It.IsAny<CancellationToken>()))
.Callback<string, object?[], CancellationToken>((_, args, _) =>
{
if (args.FirstOrDefault() is DriverHealthChanged hc)
lock (captureLock) captured.Add(hc);
})
.Returns(Task.CompletedTask);
var mockHub = new Mock<IHubContext<DriverStatusHub>>();
mockHub.Setup(h => h.Clients).Returns(mockClients.Object);
// Spawn the bridge + wait for its DPS SubscribeAck BEFORE deploying, so it catches the initial
// Healthy publish (DPS is fire-and-forget with no replay, and repeat publishes are deduped).
var bridge = harness.NodeASystem.ActorOf(
DriverStatusSignalRBridge.Props(mockHub.Object, store),
$"test-reconnect-bridge-{Guid.NewGuid():N}");
await Task.Delay(TimeSpan.FromSeconds(2), Ct);
try
{
// Validator-clean seed: a single cluster bound to NodeA with one enabled Modbus driver, no
// equipment/tags (tags would trip DraftValidator → deploy Rejected).
await SeedSingleDriverClusterAsync(harness);
await using var scope = harness.NodeA.Services.CreateAsyncScope();
var client = scope.ServiceProvider.GetRequiredService<IAdminOperationsClient>();
var deploy = await client.StartDeploymentAsync(createdBy: "e2e", Ct);
deploy.Outcome.ShouldBe(StartDeploymentOutcome.Accepted, $"Deploy not accepted: {deploy.Message}");
// Wait until the driver is spawned (factory recorded it) AND reached Healthy in the store.
await WaitForAsync(() => Task.FromResult(
factory.Created.TryGetValue(DriverId, out _)
&& store.TryGet(DriverId, out var s) && s.State == "Healthy"),
TimeSpan.FromSeconds(20));
// Simulate the lost connection the operator Reconnect responds to.
factory.Created[DriverId].ReportReconnecting();
var result = await client.AskAsync<ReconnectDriverResult>(
new ReconnectDriver(ClusterId, DriverId, "e2e", Guid.NewGuid()), Ct);
result.Ok.ShouldBeTrue($"ReconnectDriver failed: {result.Message}");
// The published health must walk Reconnecting → (later) Healthy for this driver.
await WaitForAsync(() =>
{
lock (captureLock) return Task.FromResult(HasReconnectThenHealthy(captured));
}, TimeSpan.FromSeconds(20));
List<DriverHealthChanged> snapshot;
lock (captureLock) snapshot = captured.Where(c => c.DriverInstanceId == DriverId).ToList();
HasReconnectThenHealthy(snapshot).ShouldBeTrue(
"Expected a Reconnecting push followed by a later Healthy push for the deployed driver. " +
$"States seen: [{string.Join(", ", snapshot.Select(c => c.State))}]");
store.TryGet(DriverId, out var final).ShouldBeTrue();
final.State.ShouldBe("Healthy");
// ≥ 2 proves the command genuinely re-initialised the driver via the full cluster path
// (initial connect + at least one reconnect retry).
factory.Created[DriverId].InitializeCount.ShouldBeGreaterThanOrEqualTo(2);
}
finally
{
harness.NodeASystem.Stop(bridge);
}
}
/// <summary>
/// True when the captured pushes for <see cref="DriverId"/> contain a <c>Reconnecting</c> entry
/// followed by a strictly-later <c>Healthy</c> entry (the ordered sub-sequence the reconnect FSM
/// produces).
/// </summary>
private static bool HasReconnectThenHealthy(List<DriverHealthChanged> captured)
{
var states = captured.Where(c => c.DriverInstanceId == DriverId).Select(c => c.State).ToList();
var reconnectAt = states.IndexOf("Reconnecting");
if (reconnectAt < 0) return false;
return states.Skip(reconnectAt + 1).Contains("Healthy");
}
/// <summary>
/// Seeds one single-node cluster bound to NodeA with one enabled Modbus <see cref="DriverInstance"/>
/// and no equipment/tags, so <c>StartDeploymentAsync</c> returns <c>Accepted</c> and NodeA's
/// <c>DriverHostActor</c> spawns the driver as a managed child. Mirrors
/// <c>MultiClusterScopingTests</c>'s validator-clean entity shapes.
/// </summary>
private static async Task SeedSingleDriverClusterAsync(TwoNodeClusterHarness harness)
{
await using var db = await harness.CreateConfigDbContextAsync();
db.ServerClusters.Add(new ServerCluster
{
ClusterId = ClusterId,
Name = "Reconnect E2E Cluster",
Enterprise = "zb",
Site = "central",
NodeCount = 1,
RedundancyMode = RedundancyMode.None,
CreatedBy = "test",
});
db.Namespaces.Add(new Namespace
{
NamespaceId = "RECONNECT-E2E-equipment",
ClusterId = ClusterId,
Kind = NamespaceKind.Equipment,
NamespaceUri = "urn:zb:reconnect-e2e:equipment",
});
db.ClusterNodes.Add(new ClusterNode
{
NodeId = harness.NodeANodeId,
ClusterId = ClusterId,
Host = TwoNodeClusterHarness.LoopbackHost,
ApplicationUri = "urn:zb:reconnect-e2e:node-a",
CreatedBy = "test",
});
db.DriverInstances.Add(new DriverInstance
{
DriverInstanceId = DriverId,
ClusterId = ClusterId,
NamespaceId = "RECONNECT-E2E-equipment",
Name = DriverId,
DriverType = "Modbus",
Enabled = true,
DriverConfig = "{}",
});
await db.SaveChangesAsync(Ct);
}
/// <summary>Polls <paramref name="condition"/> every 100 ms until it returns true or
/// <paramref name="timeout"/> elapses (then throws <see cref="TimeoutException"/>).</summary>
private static async Task WaitForAsync(Func<Task<bool>> condition, TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;
while (DateTime.UtcNow < deadline)
{
if (await condition()) return;
await Task.Delay(100);
}
throw new TimeoutException($"Condition not met within {timeout}");
}
}