test(integration): multi-cluster deploy scopes drivers per node
This commit is contained in:
@@ -0,0 +1,176 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests;
|
||||
|
||||
/// <summary>
|
||||
/// End-to-end multi-cluster scoping over the real 2-node Akka cluster: two driver nodes are bound
|
||||
/// to DIFFERENT logical clusters (MAIN + SITE-A) via their <see cref="ClusterNode"/> rows, then a
|
||||
/// SINGLE deployment is composed + broadcast through the real
|
||||
/// <c>AdminOperationsActor → ConfigPublishCoordinator → DriverHostActor</c> path.
|
||||
///
|
||||
/// <para>This proves the full deploy path applies per-ClusterId scoping that the actor-level test
|
||||
/// (<c>DriverHostActor_spawns_only_its_clusters_drivers</c>) covers in isolation: the node in MAIN
|
||||
/// ends up hosting ONLY the MAIN driver, the node in SITE-A ONLY the SITE-A driver, and the
|
||||
/// deployment still CONVERGES (seals) even though each node applies only a 1-driver slice — the ack
|
||||
/// fires unconditionally regardless of slice size.</para>
|
||||
///
|
||||
/// <para>The harness wires <c>NullDriverFactory</c> (no real transports), so each spawned driver is
|
||||
/// stubbed; we assert presence + identity (not connectivity) via
|
||||
/// <see cref="IFleetDiagnosticsClient"/>, the same cross-node Ask path
|
||||
/// <c>FleetDiagnosticsRoundTripTests</c> uses.</para>
|
||||
/// </summary>
|
||||
public sealed class MultiClusterScopingTests
|
||||
{
|
||||
private const string MainCluster = "MAIN";
|
||||
private const string SiteACluster = "SITE-A";
|
||||
private const string MainDriverId = "main-modbus";
|
||||
private const string SiteADriverId = "sa-modbus";
|
||||
|
||||
private static CancellationToken Ct => TestContext.Current.CancellationToken;
|
||||
|
||||
/// <summary>
|
||||
/// Verifies a single deploy scopes drivers per node: MAIN's node hosts only the MAIN driver,
|
||||
/// SITE-A's node only the SITE-A driver, and both nodes reach Applied so the deployment seals.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Deploy_scopes_drivers_to_each_nodes_own_cluster()
|
||||
{
|
||||
await using var harness = await TwoNodeClusterHarness.StartAsync();
|
||||
|
||||
// Assign each cluster node to a DIFFERENT logical cluster. NodeId MUST equal the node's
|
||||
// ClusterRoleInfo.LocalNode (host:port) so the artifact's Nodes[] map resolves each node's
|
||||
// ClusterId at apply time.
|
||||
await SeedTwoClusterConfigAsync(harness, mainNodeId: harness.NodeANodeId, siteANodeId: harness.NodeBNodeId);
|
||||
|
||||
await using var scope = harness.NodeA.Services.CreateAsyncScope();
|
||||
var adminOps = scope.ServiceProvider.GetRequiredService<IAdminOperationsClient>();
|
||||
|
||||
var result = await adminOps.StartDeploymentAsync(createdBy: "alice@test", Ct);
|
||||
result.Outcome.ShouldBe(StartDeploymentOutcome.Accepted);
|
||||
var deploymentId = result.DeploymentId!.Value.Value;
|
||||
|
||||
// Convergence: both nodes ack Applied and the coordinator seals — even though each node
|
||||
// applied only a 1-driver slice of the 2-cluster artifact.
|
||||
await WaitForAsync(async () =>
|
||||
{
|
||||
await using var db = await harness.CreateConfigDbContextAsync();
|
||||
var d = await db.Deployments.AsNoTracking()
|
||||
.FirstOrDefaultAsync(d => d.DeploymentId == deploymentId, Ct);
|
||||
return d?.Status == DeploymentStatus.Sealed;
|
||||
}, TimeSpan.FromSeconds(20));
|
||||
|
||||
await using (var db = await harness.CreateConfigDbContextAsync())
|
||||
{
|
||||
var nodeStates = await db.NodeDeploymentStates.AsNoTracking()
|
||||
.Where(s => s.DeploymentId == deploymentId)
|
||||
.ToListAsync(Ct);
|
||||
nodeStates.Count.ShouldBe(2);
|
||||
nodeStates.ShouldAllBe(s => s.Status == NodeDeploymentStatus.Applied);
|
||||
}
|
||||
|
||||
// Per-node driver presence: each DriverHostActor spawned ONLY its own cluster's slice.
|
||||
var diagnostics = scope.ServiceProvider.GetRequiredService<IFleetDiagnosticsClient>();
|
||||
|
||||
var mainDrivers = await GetDriverNamesAsync(diagnostics, harness.NodeANodeId);
|
||||
mainDrivers.ShouldBe(new[] { MainDriverId });
|
||||
|
||||
var siteADrivers = await GetDriverNamesAsync(diagnostics, harness.NodeBNodeId);
|
||||
siteADrivers.ShouldBe(new[] { SiteADriverId });
|
||||
}
|
||||
|
||||
/// <summary>Asks a node's DriverHostActor (over the cluster) for the names of its spawned drivers.</summary>
|
||||
private static async Task<string[]> GetDriverNamesAsync(IFleetDiagnosticsClient diagnostics, string nodeId)
|
||||
{
|
||||
var snapshot = await diagnostics.GetDiagnosticsAsync(NodeId.Parse(nodeId), Ct);
|
||||
return snapshot.Drivers.Select(d => d.Name).OrderBy(n => n, StringComparer.Ordinal).ToArray();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Seeds two single-node clusters (MAIN, SITE-A), one <see cref="ClusterNode"/> per cluster bound
|
||||
/// to the supplied <c>host:port</c> identities, and one <see cref="DriverInstance"/> per cluster.
|
||||
/// <see cref="ConfigComposer.SnapshotAndFlattenAsync"/> emits these straight into the artifact's
|
||||
/// <c>Clusters</c> / <c>Nodes</c> / <c>DriverInstances</c> arrays, giving each DriverHostActor the
|
||||
/// multi-cluster artifact <c>DeploymentArtifact.ResolveClusterScope</c> filters by NodeId.
|
||||
/// </summary>
|
||||
private static async Task SeedTwoClusterConfigAsync(TwoNodeClusterHarness harness, string mainNodeId, string siteANodeId)
|
||||
{
|
||||
await using var db = await harness.CreateConfigDbContextAsync();
|
||||
|
||||
db.ServerClusters.AddRange(
|
||||
NewCluster(MainCluster, "Main Cluster", "central"),
|
||||
NewCluster(SiteACluster, "Site A Cluster", "site-a"));
|
||||
|
||||
db.Namespaces.AddRange(
|
||||
NewNamespace(MainCluster, "MAIN-equipment", "urn:zb:central:equipment"),
|
||||
NewNamespace(SiteACluster, "SITE-A-equipment", "urn:zb:site-a:equipment"));
|
||||
|
||||
db.ClusterNodes.AddRange(
|
||||
NewNode(mainNodeId, MainCluster, "urn:zb:central:node-a"),
|
||||
NewNode(siteANodeId, SiteACluster, "urn:zb:site-a:node-b"));
|
||||
|
||||
db.DriverInstances.AddRange(
|
||||
NewDriver(MainDriverId, MainCluster, "MAIN-equipment"),
|
||||
NewDriver(SiteADriverId, SiteACluster, "SITE-A-equipment"));
|
||||
|
||||
await db.SaveChangesAsync(Ct);
|
||||
}
|
||||
|
||||
private static ServerCluster NewCluster(string clusterId, string name, string site) => new()
|
||||
{
|
||||
ClusterId = clusterId,
|
||||
Name = name,
|
||||
Enterprise = "zb",
|
||||
Site = site,
|
||||
NodeCount = 1,
|
||||
RedundancyMode = RedundancyMode.None,
|
||||
CreatedBy = "test",
|
||||
};
|
||||
|
||||
private static Namespace NewNamespace(string clusterId, string namespaceId, string uri) => new()
|
||||
{
|
||||
NamespaceId = namespaceId,
|
||||
ClusterId = clusterId,
|
||||
Kind = NamespaceKind.Equipment,
|
||||
NamespaceUri = uri,
|
||||
};
|
||||
|
||||
private static ClusterNode NewNode(string nodeId, string clusterId, string applicationUri) => new()
|
||||
{
|
||||
NodeId = nodeId,
|
||||
ClusterId = clusterId,
|
||||
Host = TwoNodeClusterHarness.LoopbackHost,
|
||||
ApplicationUri = applicationUri,
|
||||
CreatedBy = "test",
|
||||
};
|
||||
|
||||
private static DriverInstance NewDriver(string driverInstanceId, string clusterId, string namespaceId) => new()
|
||||
{
|
||||
DriverInstanceId = driverInstanceId,
|
||||
ClusterId = clusterId,
|
||||
NamespaceId = namespaceId,
|
||||
Name = driverInstanceId,
|
||||
DriverType = "ModbusTcp",
|
||||
Enabled = true,
|
||||
DriverConfig = "{}",
|
||||
};
|
||||
|
||||
private static async Task WaitForAsync(Func<Task<bool>> condition, TimeSpan timeout)
|
||||
{
|
||||
var deadline = DateTime.UtcNow + timeout;
|
||||
while (DateTime.UtcNow < deadline)
|
||||
{
|
||||
if (await condition()) return;
|
||||
await Task.Delay(200);
|
||||
}
|
||||
throw new TimeoutException($"Condition not met within {timeout}");
|
||||
}
|
||||
}
|
||||
@@ -90,6 +90,23 @@ public sealed class TwoNodeClusterHarness : IAsyncDisposable
|
||||
/// <summary>Gets the Akka ActorSystem for node B.</summary>
|
||||
public ActorSystem NodeBSystem => NodeB.Services.GetRequiredService<ActorSystem>();
|
||||
|
||||
/// <summary>
|
||||
/// The <c>host:port</c> identity each node's <c>DriverHostActor</c> / <c>NodeDeploymentState</c>
|
||||
/// row uses, derived the same way <c>ClusterRoleInfo</c> derives <c>LocalNode</c> from
|
||||
/// <c>Cluster:PublicHostname</c> + <c>Cluster:Port</c>. Seed a <c>ClusterNode.NodeId</c> with this
|
||||
/// value to bind a node to a logical ClusterId for multi-cluster scoping tests.
|
||||
/// </summary>
|
||||
public string NodeANodeId => $"{LoopbackHost}:{NodeAAkkaPort}";
|
||||
|
||||
/// <inheritdoc cref="NodeANodeId"/>
|
||||
public string NodeBNodeId => $"{LoopbackHost}:{NodeBAkkaPort}";
|
||||
|
||||
/// <summary>Opens a new <see cref="OtOpcUaConfigDbContext"/> over the shared ConfigDb (the same
|
||||
/// store both nodes read) so a test can seed clusters/nodes/drivers before triggering a deploy.</summary>
|
||||
/// <returns>A new DbContext the caller is responsible for disposing.</returns>
|
||||
public Task<OtOpcUaConfigDbContext> CreateConfigDbContextAsync()
|
||||
=> NodeA.Services.GetRequiredService<IDbContextFactory<OtOpcUaConfigDbContext>>().CreateDbContextAsync();
|
||||
|
||||
/// <summary>Boots both nodes and waits up to <paramref name="formationTimeout"/> for cluster convergence.</summary>
|
||||
/// <param name="formationTimeout">Maximum time to wait for cluster formation; defaults to 20 seconds if not provided.</param>
|
||||
public static async Task<TwoNodeClusterHarness> StartAsync(TimeSpan? formationTimeout = null)
|
||||
|
||||
Reference in New Issue
Block a user