using Akka.Actor; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.DependencyInjection; using Moq; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Drivers; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; using ZB.MOM.WW.OtOpcUa.Configuration.Enums; namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; /// /// E2E integration coverage for the ReconnectDriver command path through /// . /// /// The first two tests verify the message round-trip through the /// AdminOperationsActor singleton against a non-deployed instance id: the command is /// accepted, persisted as a ConfigEdit audit row, and the reply carries /// Ok = true with the matching CorrelationId. /// /// /// goes the full distance: it deploys a real driver (via the opt-in /// wired into the harness) so the /// DriverHostActor spawns a managed DriverInstanceActor, then drives the /// end-to-end reconnect path — /// ReconnectDriver → AdminOperationsActor → DriverHostActor.HandleReconnectDriver → /// DriverInstanceActor.ForceReconnect (FSM) → PublishHealthSnapshot → driver-health DPS topic → /// DriverStatusSignalRBridge → snapshot store / hub push — and asserts the published health /// transitions Healthy → Reconnecting → Healthy. /// [Trait("Category", "Integration")] public sealed class DriverReconnectE2eTests { private static CancellationToken Ct => TestContext.Current.CancellationToken; /// /// Verifies that a message dispatched through /// returns a /// with Ok = true and the matching /// correlation ID, confirming the cluster-singleton round-trip works end-to-end. /// /// The instance ID used here ("reconnect-e2e-nonexistent") does not correspond /// to a deployed driver, so no DriverInstanceActor will act on the DPS /// broadcast — the test is validating the command ingestion and reply path only. /// [Fact] public async Task Reconnect_RoundTrip_ReturnsOk() { await using var harness = await TwoNodeClusterHarness.StartAsync(); await using var scope = harness.NodeA.Services.CreateAsyncScope(); var client = scope.ServiceProvider.GetRequiredService(); var correlationId = Guid.NewGuid(); var msg = new ReconnectDriver( ClusterId: "cluster-e2e-test", DriverInstanceId: "reconnect-e2e-nonexistent", ActorByUserName: "e2e-test-runner", CorrelationId: correlationId); var result = await client.AskAsync(msg, Ct); result.CorrelationId.ShouldBe(correlationId); result.Ok.ShouldBeTrue($"ReconnectDriver round-trip failed: {result.Message}"); result.Message.ShouldBeNull(); } /// /// Verifies that a second for the same instance ID /// is also accepted (idempotent at the actor layer — the actor simply re-broadcasts /// to DPS and writes another ConfigEdit row). /// [Fact] public async Task Reconnect_IsIdempotent_SecondCallAlsoReturnsOk() { await using var harness = await TwoNodeClusterHarness.StartAsync(); await using var scope = harness.NodeA.Services.CreateAsyncScope(); var client = scope.ServiceProvider.GetRequiredService(); const string instanceId = "reconnect-idempotency-test"; var first = new ReconnectDriver("cluster-1", instanceId, "runner", Guid.NewGuid()); var second = new ReconnectDriver("cluster-1", instanceId, "runner", Guid.NewGuid()); var r1 = await client.AskAsync(first, Ct); var r2 = await client.AskAsync(second, Ct); r1.Ok.ShouldBeTrue($"First call failed: {r1.Message}"); r2.Ok.ShouldBeTrue($"Second call failed: {r2.Message}"); r1.CorrelationId.ShouldBe(first.CorrelationId); r2.CorrelationId.ShouldBe(second.CorrelationId); } private const string ClusterId = "RECONNECT-E2E"; private const string DriverId = "drv-modbus"; /// /// Full-stack reconnect: deploys a real driver (the in-process /// ), proves it reaches Healthy on the driver-health DPS /// topic, simulates a lost connection (), issues /// a through , and asserts the /// published health walks Healthy → Reconnecting → Healthy — captured at the /// hub-push seam. Confirms the operator Reconnect threads /// the whole cluster path and genuinely re-initialises the driver (InitializeCount ≥ 2). /// [Fact] public async Task Reconnect_DeployedDriver_TransitionsThroughReconnectingBackToHealthy() { var factory = new FakeReconnectDriverFactory(); await using var harness = await TwoNodeClusterHarness.StartAsync(driverFactory: factory); var store = harness.NodeA.Services.GetRequiredService(); // Capture every DriverHealthChanged the bridge pushes to the hub (the first SendCoreAsync arg). var captured = new List(); var captureLock = new object(); var mockClients = new Mock(); var mockClientProxy = new Mock(); mockClients.Setup(c => c.Group(It.IsAny())).Returns(mockClientProxy.Object); mockClientProxy .Setup(p => p.SendCoreAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Callback((_, args, _) => { if (args.FirstOrDefault() is DriverHealthChanged hc) lock (captureLock) captured.Add(hc); }) .Returns(Task.CompletedTask); var mockHub = new Mock>(); mockHub.Setup(h => h.Clients).Returns(mockClients.Object); // Spawn the bridge + wait for its DPS SubscribeAck BEFORE deploying, so it catches the initial // Healthy publish (DPS is fire-and-forget with no replay, and repeat publishes are deduped). var bridge = harness.NodeASystem.ActorOf( DriverStatusSignalRBridge.Props(mockHub.Object, store), $"test-reconnect-bridge-{Guid.NewGuid():N}"); await Task.Delay(TimeSpan.FromSeconds(2), Ct); try { // Validator-clean seed: a single cluster bound to NodeA with one enabled Modbus driver, no // equipment/tags (tags would trip DraftValidator → deploy Rejected). await SeedSingleDriverClusterAsync(harness); await using var scope = harness.NodeA.Services.CreateAsyncScope(); var client = scope.ServiceProvider.GetRequiredService(); var deploy = await client.StartDeploymentAsync(createdBy: "e2e", Ct); deploy.Outcome.ShouldBe(StartDeploymentOutcome.Accepted, $"Deploy not accepted: {deploy.Message}"); // Wait until the driver is spawned (factory recorded it) AND reached Healthy in the store. await WaitForAsync(() => Task.FromResult( factory.Created.TryGetValue(DriverId, out _) && store.TryGet(DriverId, out var s) && s.State == "Healthy"), TimeSpan.FromSeconds(20)); // Simulate the lost connection the operator Reconnect responds to. factory.Created[DriverId].ReportReconnecting(); var result = await client.AskAsync( new ReconnectDriver(ClusterId, DriverId, "e2e", Guid.NewGuid()), Ct); result.Ok.ShouldBeTrue($"ReconnectDriver failed: {result.Message}"); // The published health must walk Reconnecting → (later) Healthy for this driver. await WaitForAsync(() => { lock (captureLock) return Task.FromResult(HasReconnectThenHealthy(captured)); }, TimeSpan.FromSeconds(20)); List snapshot; lock (captureLock) snapshot = captured.Where(c => c.DriverInstanceId == DriverId).ToList(); HasReconnectThenHealthy(snapshot).ShouldBeTrue( "Expected a Reconnecting push followed by a later Healthy push for the deployed driver. " + $"States seen: [{string.Join(", ", snapshot.Select(c => c.State))}]"); store.TryGet(DriverId, out var final).ShouldBeTrue(); final.State.ShouldBe("Healthy"); // ≥ 2 proves the command genuinely re-initialised the driver via the full cluster path // (initial connect + at least one reconnect retry). factory.Created[DriverId].InitializeCount.ShouldBeGreaterThanOrEqualTo(2); } finally { harness.NodeASystem.Stop(bridge); } } /// /// True when the captured pushes for contain a Reconnecting entry /// followed by a strictly-later Healthy entry (the ordered sub-sequence the reconnect FSM /// produces). /// private static bool HasReconnectThenHealthy(List captured) { var states = captured.Where(c => c.DriverInstanceId == DriverId).Select(c => c.State).ToList(); var reconnectAt = states.IndexOf("Reconnecting"); if (reconnectAt < 0) return false; return states.Skip(reconnectAt + 1).Contains("Healthy"); } /// /// Seeds one single-node cluster bound to NodeA with one enabled Modbus /// and no equipment/tags, so StartDeploymentAsync returns Accepted and NodeA's /// DriverHostActor spawns the driver as a managed child. Mirrors /// MultiClusterScopingTests's validator-clean entity shapes. /// private static async Task SeedSingleDriverClusterAsync(TwoNodeClusterHarness harness) { await using var db = await harness.CreateConfigDbContextAsync(); db.ServerClusters.Add(new ServerCluster { ClusterId = ClusterId, Name = "Reconnect E2E Cluster", Enterprise = "zb", Site = "central", NodeCount = 1, RedundancyMode = RedundancyMode.None, CreatedBy = "test", }); db.Namespaces.Add(new Namespace { NamespaceId = "RECONNECT-E2E-equipment", ClusterId = ClusterId, Kind = NamespaceKind.Equipment, NamespaceUri = "urn:zb:reconnect-e2e:equipment", }); db.ClusterNodes.Add(new ClusterNode { NodeId = harness.NodeANodeId, ClusterId = ClusterId, Host = TwoNodeClusterHarness.LoopbackHost, ApplicationUri = "urn:zb:reconnect-e2e:node-a", CreatedBy = "test", }); db.DriverInstances.Add(new DriverInstance { DriverInstanceId = DriverId, ClusterId = ClusterId, NamespaceId = "RECONNECT-E2E-equipment", Name = DriverId, DriverType = "Modbus", Enabled = true, DriverConfig = "{}", }); await db.SaveChangesAsync(Ct); } /// Polls every 100 ms until it returns true or /// elapses (then throws ). private static async Task WaitForAsync(Func> condition, TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { if (await condition()) return; await Task.Delay(100); } throw new TimeoutException($"Condition not met within {timeout}"); } }