From f87ad5ae8f4a39cd0d5f8168eecd788266df13e4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 18 Jun 2026 09:12:15 -0400 Subject: [PATCH] test(adminui): E2E deployed-driver Healthy->Reconnecting->Healthy transition on Reconnect --- .../DriverReconnectE2eTests.cs | 201 +++++++++++++++++- .../Fakes/FakeReconnectDriverFactory.cs | 80 ++++++- 2 files changed, 262 insertions(+), 19 deletions(-) diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/DriverReconnectE2eTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/DriverReconnectE2eTests.cs index 002bd4c2..0eba864a 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/DriverReconnectE2eTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/DriverReconnectE2eTests.cs @@ -1,8 +1,15 @@ +using Akka.Actor; +using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.DependencyInjection; +using Moq; using Shouldly; using Xunit; +using ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Drivers; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; @@ -10,17 +17,20 @@ namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; /// E2E integration coverage for the ReconnectDriver command path through /// . /// -/// Scope note: wiring a live DriverInstanceActor for the full -/// Healthy → Reconnecting → Healthy health-transition assertion requires a deployed -/// driver row in the config DB, a real fixture endpoint, and the -/// DriverHostActor to have registered the instance — substantially more -/// harness complexity than the two-node cluster setup alone provides. That deeper -/// fixture is tracked as a follow-up. This suite instead verifies the message -/// round-trip through the AdminOperationsActor singleton: the command is +/// The first two tests verify the message round-trip through the +/// AdminOperationsActor singleton against a non-deployed instance id: the command is /// accepted, persisted as a ConfigEdit audit row, and the reply carries -/// Ok = true with the matching CorrelationId. The DPS broadcast -/// that triggers the actor-side reconnect is exercised by the control-plane unit -/// tests that mock IActorRef. +/// Ok = true with the matching CorrelationId. +/// +/// +/// goes the full distance: it deploys a real driver (via the opt-in +/// wired into the harness) so the +/// DriverHostActor spawns a managed DriverInstanceActor, then drives the +/// end-to-end reconnect path — +/// ReconnectDriver → AdminOperationsActor → DriverHostActor.HandleReconnectDriver → +/// DriverInstanceActor.ForceReconnect (FSM) → PublishHealthSnapshot → driver-health DPS topic → +/// DriverStatusSignalRBridge → snapshot store / hub push — and asserts the published health +/// transitions Healthy → Reconnecting → Healthy. /// [Trait("Category", "Integration")] public sealed class DriverReconnectE2eTests @@ -83,4 +93,175 @@ public sealed class DriverReconnectE2eTests r1.CorrelationId.ShouldBe(first.CorrelationId); r2.CorrelationId.ShouldBe(second.CorrelationId); } + + private const string ClusterId = "RECONNECT-E2E"; + private const string DriverId = "drv-modbus"; + + /// + /// Full-stack reconnect: deploys a real driver (the in-process + /// ), proves it reaches Healthy on the driver-health DPS + /// topic, simulates a lost connection (), issues + /// a through , and asserts the + /// published health walks Healthy → Reconnecting → Healthy — captured at the + /// hub-push seam. Confirms the operator Reconnect threads + /// the whole cluster path and genuinely re-initialises the driver (InitializeCount ≥ 2). + /// + [Fact] + public async Task Reconnect_DeployedDriver_TransitionsThroughReconnectingBackToHealthy() + { + var factory = new FakeReconnectDriverFactory(); + await using var harness = await TwoNodeClusterHarness.StartAsync(driverFactory: factory); + + var store = harness.NodeA.Services.GetRequiredService(); + + // Capture every DriverHealthChanged the bridge pushes to the hub (the first SendCoreAsync arg). + var captured = new List(); + var captureLock = new object(); + var mockClients = new Mock(); + var mockClientProxy = new Mock(); + mockClients.Setup(c => c.Group(It.IsAny())).Returns(mockClientProxy.Object); + mockClientProxy + .Setup(p => p.SendCoreAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Callback((_, args, _) => + { + if (args.FirstOrDefault() is DriverHealthChanged hc) + lock (captureLock) captured.Add(hc); + }) + .Returns(Task.CompletedTask); + var mockHub = new Mock>(); + mockHub.Setup(h => h.Clients).Returns(mockClients.Object); + + // Spawn the bridge + wait for its DPS SubscribeAck BEFORE deploying, so it catches the initial + // Healthy publish (DPS is fire-and-forget with no replay, and repeat publishes are deduped). + var bridge = harness.NodeASystem.ActorOf( + DriverStatusSignalRBridge.Props(mockHub.Object, store), + $"test-reconnect-bridge-{Guid.NewGuid():N}"); + await Task.Delay(TimeSpan.FromSeconds(2), Ct); + + try + { + // Validator-clean seed: a single cluster bound to NodeA with one enabled Modbus driver, no + // equipment/tags (tags would trip DraftValidator → deploy Rejected). + await SeedSingleDriverClusterAsync(harness); + + await using var scope = harness.NodeA.Services.CreateAsyncScope(); + var client = scope.ServiceProvider.GetRequiredService(); + + var deploy = await client.StartDeploymentAsync(createdBy: "e2e", Ct); + deploy.Outcome.ShouldBe(StartDeploymentOutcome.Accepted, $"Deploy not accepted: {deploy.Message}"); + + // Wait until the driver is spawned (factory recorded it) AND reached Healthy in the store. + await WaitForAsync(() => Task.FromResult( + factory.Created.TryGetValue(DriverId, out _) + && store.TryGet(DriverId, out var s) && s.State == "Healthy"), + TimeSpan.FromSeconds(20)); + + // Simulate the lost connection the operator Reconnect responds to. + factory.Created[DriverId].ReportReconnecting(); + + var result = await client.AskAsync( + new ReconnectDriver(ClusterId, DriverId, "e2e", Guid.NewGuid()), Ct); + result.Ok.ShouldBeTrue($"ReconnectDriver failed: {result.Message}"); + + // The published health must walk Reconnecting → (later) Healthy for this driver. + await WaitForAsync(() => + { + lock (captureLock) return Task.FromResult(HasReconnectThenHealthy(captured)); + }, TimeSpan.FromSeconds(20)); + + List snapshot; + lock (captureLock) snapshot = captured.Where(c => c.DriverInstanceId == DriverId).ToList(); + + HasReconnectThenHealthy(captured).ShouldBeTrue( + "Expected a Reconnecting push followed by a later Healthy push for the deployed driver. " + + $"States seen: [{string.Join(", ", snapshot.Select(c => c.State))}]"); + + store.TryGet(DriverId, out var final).ShouldBeTrue(); + final.State.ShouldBe("Healthy"); + + // ≥ 2 proves the command genuinely re-initialised the driver via the full cluster path + // (initial connect + at least one reconnect retry). + factory.Created[DriverId].InitializeCount.ShouldBeGreaterThanOrEqualTo(2); + } + finally + { + harness.NodeASystem.Stop(bridge); + } + } + + /// + /// True when the captured pushes for contain a Reconnecting entry + /// followed by a strictly-later Healthy entry (the ordered sub-sequence the reconnect FSM + /// produces). + /// + private static bool HasReconnectThenHealthy(List captured) + { + var states = captured.Where(c => c.DriverInstanceId == DriverId).Select(c => c.State).ToList(); + var reconnectAt = states.IndexOf("Reconnecting"); + if (reconnectAt < 0) return false; + return states.Skip(reconnectAt + 1).Contains("Healthy"); + } + + /// + /// Seeds one single-node cluster bound to NodeA with one enabled Modbus + /// and no equipment/tags, so StartDeploymentAsync returns Accepted and NodeA's + /// DriverHostActor spawns the driver as a managed child. Mirrors + /// MultiClusterScopingTests's validator-clean entity shapes. + /// + private static async Task SeedSingleDriverClusterAsync(TwoNodeClusterHarness harness) + { + await using var db = await harness.CreateConfigDbContextAsync(); + + db.ServerClusters.Add(new ServerCluster + { + ClusterId = ClusterId, + Name = "Reconnect E2E Cluster", + Enterprise = "zb", + Site = "central", + NodeCount = 1, + RedundancyMode = RedundancyMode.None, + CreatedBy = "test", + }); + + db.Namespaces.Add(new Namespace + { + NamespaceId = "RECONNECT-E2E-equipment", + ClusterId = ClusterId, + Kind = NamespaceKind.Equipment, + NamespaceUri = "urn:zb:reconnect-e2e:equipment", + }); + + db.ClusterNodes.Add(new ClusterNode + { + NodeId = harness.NodeANodeId, + ClusterId = ClusterId, + Host = TwoNodeClusterHarness.LoopbackHost, + ApplicationUri = "urn:zb:reconnect-e2e:node-a", + CreatedBy = "test", + }); + + db.DriverInstances.Add(new DriverInstance + { + DriverInstanceId = DriverId, + ClusterId = ClusterId, + NamespaceId = "RECONNECT-E2E-equipment", + Name = DriverId, + DriverType = "Modbus", + Enabled = true, + DriverConfig = "{}", + }); + + await db.SaveChangesAsync(Ct); + } + + private static async Task WaitForAsync(Func> condition, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (DateTime.UtcNow < deadline) + { + if (await condition()) return; + await Task.Delay(100); + } + throw new TimeoutException($"Condition not met within {timeout}"); + } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/Fakes/FakeReconnectDriverFactory.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/Fakes/FakeReconnectDriverFactory.cs index d9e560e3..bad9995f 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/Fakes/FakeReconnectDriverFactory.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/Fakes/FakeReconnectDriverFactory.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; @@ -15,24 +16,38 @@ namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; /// initialize/connect succeeds, so the wrapping DriverInstanceActor walks /// Connecting → Connected → Healthy and (on ReconnectDriver) drives /// ForceReconnect → Reconnecting → re-initialize → Connected without any fault injection. +/// Each created driver is recorded in so a test can reach in and flip its +/// health to just before issuing the reconnect command. /// public sealed class FakeReconnectDriverFactory : IDriverFactory { /// The single driver type this fake factory materialises. public const string FakeDriverType = "Modbus"; + /// + /// Drivers this factory has created, keyed by driverInstanceId, so a test can retrieve the + /// live instance (e.g. to call before + /// dispatching a reconnect command). Concurrent because the factory is invoked on the spawning + /// actor thread while the test reads on its own thread. + /// + public ConcurrentDictionary Created { get; } = new(); + /// /// Returns a when is - /// ; otherwise null (the host logs + skips the row). + /// ; otherwise null (the host logs + skips the row). Records the + /// created driver in keyed by . /// /// The driver type name from the deployed DriverInstance row. /// The stable driver-instance identifier. /// The driver configuration as a JSON string (ignored by the fake). /// A new , or null for an unsupported type. public IDriver? TryCreate(string driverType, string driverInstanceId, string driverConfigJson) - => driverType == FakeDriverType - ? new FakeReconnectDriver(driverInstanceId, driverType) - : null; + { + if (driverType != FakeDriverType) return null; + var driver = new FakeReconnectDriver(driverInstanceId, driverType); + Created[driverInstanceId] = driver; + return driver; + } /// Gets the driver-type names this factory can materialise. public IReadOnlyCollection SupportedTypes { get; } = new[] { FakeDriverType }; @@ -43,6 +58,14 @@ public sealed class FakeReconnectDriverFactory : IDriverFactory /// wrapping DriverInstanceActor reaches Connected and publishes a /// snapshot. Reads/writes/subscribes are benign no-ops; this exists /// only to let a deployed driver walk the Healthy ↔ Reconnecting FSM in-process for E2E tests. +/// +/// The driver's reported health is controllable: a test calls +/// to make return (simulating a lost +/// connection — the realistic trigger for an operator Reconnect). The DriverInstanceActor's +/// ForceReconnect handler POLLS right after entering its Reconnecting +/// state, so that snapshot surfaces on the driver-health topic. +/// The subsequent retry calls , which clears the flag back to +/// , so the next snapshot returns to Healthy. /// public sealed class FakeReconnectDriver : IDriver { @@ -63,16 +86,48 @@ public sealed class FakeReconnectDriver : IDriver /// Gets the driver type name (e.g. "Modbus"). public string DriverType { get; } + /// + /// When true, reports . + /// volatile because the actor polls from its own thread while the + /// test flips this from another via . + /// + private volatile bool _reconnecting; + + /// Timestamp of the most recent successful initialize; surfaced as the last successful read. + private DateTime _lastSuccess = DateTime.UtcNow; + + /// + /// Number of times has been invoked. Read by the test to prove a + /// reconnect genuinely re-initialised the driver through the full cluster path (≥ 2 means the + /// initial connect plus at least one reconnect retry). Mutated via since + /// the actor's retry path runs on a thread-pool thread. + /// + public int InitializeCount; + + /// + /// Marks the driver as having lost its connection so the next poll reports + /// . The test calls this immediately before dispatching the + /// reconnect command, simulating the realistic operator-Reconnect trigger. + /// + public void ReportReconnecting() => _reconnecting = true; + /// /// Connect/initialize path — always succeeds (returns a completed task), so the actor self-Tells /// InitializeSucceeded and becomes Connected. This is the method that makes connect - /// succeed; the FSM's reconnect path re-invokes it and it succeeds again. + /// succeed; the FSM's reconnect path re-invokes it and it succeeds again. Increments + /// and clears the reconnecting flag (initialize succeeded → healthy + /// again). /// /// The driver configuration JSON (ignored). /// Cancellation token for the operation. /// A completed task — initialization always succeeds. public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) - => Task.CompletedTask; + { + Interlocked.Increment(ref InitializeCount); + _lastSuccess = DateTime.UtcNow; + _reconnecting = false; + return Task.CompletedTask; + } /// Applies a config change in place — a no-op that always succeeds. /// The driver configuration JSON (ignored). @@ -87,9 +142,16 @@ public sealed class FakeReconnectDriver : IDriver public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; - /// Returns a snapshot so deployed drivers publish health. - /// A in the state. - public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null); + /// + /// Returns a snapshot when the test has flagged a lost + /// connection via ; otherwise a + /// snapshot. The actor polls this on every observable state change, so the published state tracks + /// this flag. + /// + /// A reflecting the controllable connection state. + public DriverHealth GetHealth() => _reconnecting + ? new DriverHealth(DriverState.Reconnecting, _lastSuccess, null) + : new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); /// Returns a zero memory footprint (the fake holds no driver-attributable caches). /// Always 0.