diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/MultiClusterScopingTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/MultiClusterScopingTests.cs new file mode 100644 index 00000000..b96cbd88 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/MultiClusterScopingTests.cs @@ -0,0 +1,176 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; + +namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; + +/// +/// End-to-end multi-cluster scoping over the real 2-node Akka cluster: two driver nodes are bound +/// to DIFFERENT logical clusters (MAIN + SITE-A) via their rows, then a +/// SINGLE deployment is composed + broadcast through the real +/// AdminOperationsActor → ConfigPublishCoordinator → DriverHostActor path. +/// +/// This proves the full deploy path applies per-ClusterId scoping that the actor-level test +/// (DriverHostActor_spawns_only_its_clusters_drivers) covers in isolation: the node in MAIN +/// ends up hosting ONLY the MAIN driver, the node in SITE-A ONLY the SITE-A driver, and the +/// deployment still CONVERGES (seals) even though each node applies only a 1-driver slice — the ack +/// fires unconditionally regardless of slice size. +/// +/// The harness wires NullDriverFactory (no real transports), so each spawned driver is +/// stubbed; we assert presence + identity (not connectivity) via +/// , the same cross-node Ask path +/// FleetDiagnosticsRoundTripTests uses. +/// +public sealed class MultiClusterScopingTests +{ + private const string MainCluster = "MAIN"; + private const string SiteACluster = "SITE-A"; + private const string MainDriverId = "main-modbus"; + private const string SiteADriverId = "sa-modbus"; + + private static CancellationToken Ct => TestContext.Current.CancellationToken; + + /// + /// Verifies a single deploy scopes drivers per node: MAIN's node hosts only the MAIN driver, + /// SITE-A's node only the SITE-A driver, and both nodes reach Applied so the deployment seals. + /// + [Fact] + public async Task Deploy_scopes_drivers_to_each_nodes_own_cluster() + { + await using var harness = await TwoNodeClusterHarness.StartAsync(); + + // Assign each cluster node to a DIFFERENT logical cluster. NodeId MUST equal the node's + // ClusterRoleInfo.LocalNode (host:port) so the artifact's Nodes[] map resolves each node's + // ClusterId at apply time. + await SeedTwoClusterConfigAsync(harness, mainNodeId: harness.NodeANodeId, siteANodeId: harness.NodeBNodeId); + + await using var scope = harness.NodeA.Services.CreateAsyncScope(); + var adminOps = scope.ServiceProvider.GetRequiredService(); + + var result = await adminOps.StartDeploymentAsync(createdBy: "alice@test", Ct); + result.Outcome.ShouldBe(StartDeploymentOutcome.Accepted); + var deploymentId = result.DeploymentId!.Value.Value; + + // Convergence: both nodes ack Applied and the coordinator seals — even though each node + // applied only a 1-driver slice of the 2-cluster artifact. + await WaitForAsync(async () => + { + await using var db = await harness.CreateConfigDbContextAsync(); + var d = await db.Deployments.AsNoTracking() + .FirstOrDefaultAsync(d => d.DeploymentId == deploymentId, Ct); + return d?.Status == DeploymentStatus.Sealed; + }, TimeSpan.FromSeconds(20)); + + await using (var db = await harness.CreateConfigDbContextAsync()) + { + var nodeStates = await db.NodeDeploymentStates.AsNoTracking() + .Where(s => s.DeploymentId == deploymentId) + .ToListAsync(Ct); + nodeStates.Count.ShouldBe(2); + nodeStates.ShouldAllBe(s => s.Status == NodeDeploymentStatus.Applied); + } + + // Per-node driver presence: each DriverHostActor spawned ONLY its own cluster's slice. + var diagnostics = scope.ServiceProvider.GetRequiredService(); + + var mainDrivers = await GetDriverNamesAsync(diagnostics, harness.NodeANodeId); + mainDrivers.ShouldBe(new[] { MainDriverId }); + + var siteADrivers = await GetDriverNamesAsync(diagnostics, harness.NodeBNodeId); + siteADrivers.ShouldBe(new[] { SiteADriverId }); + } + + /// Asks a node's DriverHostActor (over the cluster) for the names of its spawned drivers. + private static async Task GetDriverNamesAsync(IFleetDiagnosticsClient diagnostics, string nodeId) + { + var snapshot = await diagnostics.GetDiagnosticsAsync(NodeId.Parse(nodeId), Ct); + return snapshot.Drivers.Select(d => d.Name).OrderBy(n => n, StringComparer.Ordinal).ToArray(); + } + + /// + /// Seeds two single-node clusters (MAIN, SITE-A), one per cluster bound + /// to the supplied host:port identities, and one per cluster. + /// emits these straight into the artifact's + /// Clusters / Nodes / DriverInstances arrays, giving each DriverHostActor the + /// multi-cluster artifact DeploymentArtifact.ResolveClusterScope filters by NodeId. + /// + private static async Task SeedTwoClusterConfigAsync(TwoNodeClusterHarness harness, string mainNodeId, string siteANodeId) + { + await using var db = await harness.CreateConfigDbContextAsync(); + + db.ServerClusters.AddRange( + NewCluster(MainCluster, "Main Cluster", "central"), + NewCluster(SiteACluster, "Site A Cluster", "site-a")); + + db.Namespaces.AddRange( + NewNamespace(MainCluster, "MAIN-equipment", "urn:zb:central:equipment"), + NewNamespace(SiteACluster, "SITE-A-equipment", "urn:zb:site-a:equipment")); + + db.ClusterNodes.AddRange( + NewNode(mainNodeId, MainCluster, "urn:zb:central:node-a"), + NewNode(siteANodeId, SiteACluster, "urn:zb:site-a:node-b")); + + db.DriverInstances.AddRange( + NewDriver(MainDriverId, MainCluster, "MAIN-equipment"), + NewDriver(SiteADriverId, SiteACluster, "SITE-A-equipment")); + + await db.SaveChangesAsync(Ct); + } + + private static ServerCluster NewCluster(string clusterId, string name, string site) => new() + { + ClusterId = clusterId, + Name = name, + Enterprise = "zb", + Site = site, + NodeCount = 1, + RedundancyMode = RedundancyMode.None, + CreatedBy = "test", + }; + + private static Namespace NewNamespace(string clusterId, string namespaceId, string uri) => new() + { + NamespaceId = namespaceId, + ClusterId = clusterId, + Kind = NamespaceKind.Equipment, + NamespaceUri = uri, + }; + + private static ClusterNode NewNode(string nodeId, string clusterId, string applicationUri) => new() + { + NodeId = nodeId, + ClusterId = clusterId, + Host = TwoNodeClusterHarness.LoopbackHost, + ApplicationUri = applicationUri, + CreatedBy = "test", + }; + + private static DriverInstance NewDriver(string driverInstanceId, string clusterId, string namespaceId) => new() + { + DriverInstanceId = driverInstanceId, + ClusterId = clusterId, + NamespaceId = namespaceId, + Name = driverInstanceId, + DriverType = "ModbusTcp", + Enabled = true, + DriverConfig = "{}", + }; + + private static async Task WaitForAsync(Func> condition, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (DateTime.UtcNow < deadline) + { + if (await condition()) return; + await Task.Delay(200); + } + throw new TimeoutException($"Condition not met within {timeout}"); + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/TwoNodeClusterHarness.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/TwoNodeClusterHarness.cs index b0141d1c..09093f4e 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/TwoNodeClusterHarness.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/TwoNodeClusterHarness.cs @@ -90,6 +90,23 @@ public sealed class TwoNodeClusterHarness : IAsyncDisposable /// Gets the Akka ActorSystem for node B. public ActorSystem NodeBSystem => NodeB.Services.GetRequiredService(); + /// + /// The host:port identity each node's DriverHostActor / NodeDeploymentState + /// row uses, derived the same way ClusterRoleInfo derives LocalNode from + /// Cluster:PublicHostname + Cluster:Port. Seed a ClusterNode.NodeId with this + /// value to bind a node to a logical ClusterId for multi-cluster scoping tests. + /// + public string NodeANodeId => $"{LoopbackHost}:{NodeAAkkaPort}"; + + /// + public string NodeBNodeId => $"{LoopbackHost}:{NodeBAkkaPort}"; + + /// Opens a new over the shared ConfigDb (the same + /// store both nodes read) so a test can seed clusters/nodes/drivers before triggering a deploy. + /// A new DbContext the caller is responsible for disposing. + public Task CreateConfigDbContextAsync() + => NodeA.Services.GetRequiredService>().CreateDbContextAsync(); + /// Boots both nodes and waits up to for cluster convergence. /// Maximum time to wait for cluster formation; defaults to 20 seconds if not provided. public static async Task StartAsync(TimeSpan? formationTimeout = null)