using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Hosting; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.DependencyInjection; using Moq; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Drivers; namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; /// /// E2E integration coverage for the DriverStatusSignalRBridge actor โ†’ snapshot /// store โ†’ SignalR hub push pipeline. /// /// Scope note: wiring a full SignalR hub connection from inside an /// integration test requires an HTTP listener, JWT authentication (the hub has /// [Authorize]), and a real WebSocket upgrade โ€” significantly more plumbing /// than the two-node harness provides out of the box. Full-stack hub connectivity is /// covered by the Playwright smoke tests in the manual runbook (ยง8.3). This suite /// instead exercises the bridge actor directly: it spawns a /// inside the harness actor system, publishes /// a to the driver-health DPS topic, and /// asserts that (a) the snapshot store is updated and (b) the mock /// receives a SendAsync call with /// the matching DriverInstanceId. This validates the bridge actor's DPS /// subscription, store write, and hub-push code paths without a live HTTP client. /// [Trait("Category", "Integration")] public sealed class DriverStatusHubE2eTests { private static CancellationToken Ct => TestContext.Current.CancellationToken; /// /// Verifies that a published to the /// driver-health DPS topic is forwarded by /// to both the (via Upsert) and the /// mock (via SendAsync). /// [Fact] public async Task StatusHub_BridgeActor_ForwardsHealthChanged_ToStoreAndHub() { await using var harness = await TwoNodeClusterHarness.StartAsync(); // Resolve the snapshot store that AddAdminUI() wired into DI. var store = harness.NodeA.Services.GetRequiredService(); // Build a mock IHubContext that captures SendAsync calls. var sentMessages = new List<(string method, object? arg)>(); var mockClients = new Mock(); var mockClientProxy = new Mock(); mockClients.Setup(c => c.Group(It.IsAny())).Returns(mockClientProxy.Object); mockClientProxy .Setup(p => p.SendCoreAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Callback((method, args, _) => sentMessages.Add((method, args.FirstOrDefault()))) .Returns(Task.CompletedTask); var mockHub = new Mock>(); mockHub.Setup(h => h.Clients).Returns(mockClients.Object); // Spawn the bridge actor directly in the harness ActorSystem. var bridge = harness.NodeASystem.ActorOf( DriverStatusSignalRBridge.Props(mockHub.Object, store), $"test-driver-status-bridge-{Guid.NewGuid():N}"); // Wait for the DPS subscription to be acknowledged. await Task.Delay(TimeSpan.FromSeconds(2), Ct); // Publish a DriverHealthChanged snapshot via DPS. const string testInstanceId = "driver-hub-e2e-test-instance"; var snapshot = new DriverHealthChanged( ClusterId: "cluster-e2e", DriverInstanceId: testInstanceId, State: "Healthy", LastSuccessfulReadUtc: DateTime.UtcNow, LastError: null, ErrorCount5Min: 0, PublishedUtc: DateTime.UtcNow); DistributedPubSub.Get(harness.NodeASystem).Mediator.Tell( new Publish(DriverStatusSignalRBridge.TopicName, snapshot)); // Wait up to 3s for the bridge actor to process the message and invoke the hub mock. await WaitForAsync( () => Task.FromResult(sentMessages.Count > 0), TimeSpan.FromSeconds(3)); // Assert snapshot store was updated. store.TryGet(testInstanceId, out var stored).ShouldBeTrue("Snapshot store should contain the published snapshot."); stored.DriverInstanceId.ShouldBe(testInstanceId); stored.State.ShouldBe("Healthy"); // Assert hub mock received the push on the expected method name. sentMessages.ShouldNotBeEmpty("Hub mock should have received a SendAsync call."); sentMessages[0].method.ShouldBe(DriverStatusHub.MethodName); // Clean up actor to avoid lingering DPS subscription. harness.NodeASystem.Stop(bridge); } /// /// Verifies that publishing two consecutive snapshots /// for the same instance ID results in the store holding only the most recent state /// (last-write-wins) and both hub push calls being made. /// [Fact] public async Task StatusHub_BridgeActor_LastSnapshotWins_InStore() { await using var harness = await TwoNodeClusterHarness.StartAsync(); var store = harness.NodeA.Services.GetRequiredService(); var hubCallCount = 0; var mockClients = new Mock(); var mockClientProxy = new Mock(); mockClients.Setup(c => c.Group(It.IsAny())).Returns(mockClientProxy.Object); mockClientProxy .Setup(p => p.SendCoreAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Callback((_, _, _) => Interlocked.Increment(ref hubCallCount)) .Returns(Task.CompletedTask); var mockHub = new Mock>(); mockHub.Setup(h => h.Clients).Returns(mockClients.Object); var bridge = harness.NodeASystem.ActorOf( DriverStatusSignalRBridge.Props(mockHub.Object, store), $"test-driver-status-bridge-2-{Guid.NewGuid():N}"); await Task.Delay(TimeSpan.FromSeconds(2), Ct); const string instanceId = "driver-hub-last-write-wins"; var mediator = DistributedPubSub.Get(harness.NodeASystem).Mediator; mediator.Tell(new Publish(DriverStatusSignalRBridge.TopicName, new DriverHealthChanged("c1", instanceId, "Reconnecting", null, "lost connection", 1, DateTime.UtcNow))); mediator.Tell(new Publish(DriverStatusSignalRBridge.TopicName, new DriverHealthChanged("c1", instanceId, "Healthy", DateTime.UtcNow, null, 0, DateTime.UtcNow))); await WaitForAsync( () => Task.FromResult(hubCallCount >= 2), TimeSpan.FromSeconds(3)); // Store should reflect the most recent (Healthy) state. store.TryGet(instanceId, out var stored).ShouldBeTrue(); stored.State.ShouldBe("Healthy"); hubCallCount.ShouldBeGreaterThanOrEqualTo(2); harness.NodeASystem.Stop(bridge); } private static async Task WaitForAsync(Func> condition, TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { if (await condition()) return; await Task.Delay(100); } throw new TimeoutException($"Condition not met within {timeout}"); } }