diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ClusterRoleInfo.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ClusterRoleInfo.cs index b344c9a..ef1a145 100644 --- a/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ClusterRoleInfo.cs +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ClusterRoleInfo.cs @@ -29,7 +29,10 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable { _cluster = Akka.Cluster.Cluster.Get(system); _logger = logger; - _localNode = CommonsNodeId.Parse(options.Value.PublicHostname); + // NodeId encodes host:port so cluster members on shared hosts (test loopback, dev VMs + // sharing a bind IP) stay distinct. Production hosts have unique DNS names so the port + // suffix is harmless redundancy. + _localNode = CommonsNodeId.Parse($"{options.Value.PublicHostname}:{options.Value.Port}"); _localRoles = new HashSet(options.Value.Roles, StringComparer.Ordinal); SeedFromCurrentState(); @@ -48,7 +51,7 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable { if (!_membersByRole.TryGetValue(role, out var members)) return Array.Empty(); return members - .Select(m => CommonsNodeId.Parse(m.Address.Host ?? string.Empty)) + .Select(m => ToNodeId(m.Address)) .ToArray(); } } @@ -58,7 +61,7 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable lock (_lock) { return _roleLeaders.TryGetValue(role, out var leader) && leader is not null - ? CommonsNodeId.Parse(leader.Address.Host ?? string.Empty) + ? ToNodeId(leader.Address) : (CommonsNodeId?)null; } } @@ -121,7 +124,7 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable { _roleLeaders.TryGetValue(evt.Role, out var prevMember); if (prevMember is not null) - previous = CommonsNodeId.Parse(prevMember.Address.Host ?? string.Empty); + previous = ToNodeId(prevMember.Address); var nextMember = evt.Leader is null ? null @@ -129,7 +132,7 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable _roleLeaders[evt.Role] = nextMember; if (nextMember is not null) - next = CommonsNodeId.Parse(nextMember.Address.Host ?? string.Empty); + next = ToNodeId(nextMember.Address); raise = !Nullable.Equals(previous, next); } @@ -150,6 +153,9 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable } } + private static CommonsNodeId ToNodeId(Akka.Actor.Address address) => + CommonsNodeId.Parse($"{address.Host ?? string.Empty}:{address.Port ?? 0}"); + public void Dispose() { _subscriber?.Tell(PoisonPill.Instance); diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs index 306f18a..eb60711 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs @@ -24,6 +24,7 @@ namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Coordinators; public sealed class ConfigPublishCoordinator : ReceiveActor, IWithTimers { public const string DeploymentsTopic = "deployments"; + public const string DeploymentAcksTopic = "deployment-acks"; public static readonly TimeSpan DefaultApplyDeadline = TimeSpan.FromMinutes(2); private readonly IDbContextFactory _dbFactory; @@ -50,6 +51,7 @@ public sealed class ConfigPublishCoordinator : ReceiveActor, IWithTimers Receive(HandleDispatch); Receive(HandleAck); Receive(HandleDeadline); + Receive(_ => { /* DPS subscribe confirmation */ }); } /// @@ -59,6 +61,10 @@ public sealed class ConfigPublishCoordinator : ReceiveActor, IWithTimers /// protected override void PreStart() { + // Subscribe to per-node ApplyAck broadcasts so DriverHostActors on remote members can + // route their ACKs to whichever node currently hosts this singleton. + DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentAcksTopic, Self)); + using var db = _dbFactory.CreateDbContext(); var inflight = db.Deployments .Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks) @@ -239,7 +245,9 @@ public sealed class ConfigPublishCoordinator : ReceiveActor, IWithTimers if (!member.Roles.Contains("driver")) continue; var host = member.Address.Host; if (string.IsNullOrWhiteSpace(host)) continue; - nodes.Add(NodeId.Parse(host)); + // Match ClusterRoleInfo's NodeId derivation (host:port) so DriverHostActor's + // self-identification and the coordinator's expected-ack set agree. + nodes.Add(NodeId.Parse($"{host}:{member.Address.Port ?? 0}")); } return nodes; } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index 6908d16..ab521a7 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -30,6 +30,7 @@ namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; public sealed class DriverHostActor : ReceiveActor, IWithTimers { public const string DeploymentsTopic = "deployments"; + public const string DeploymentAcksTopic = "deployment-acks"; public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30); private readonly IDbContextFactory _dbFactory; @@ -276,9 +277,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers } else { - // No direct coordinator handle — publish back through DistributedPubSub so the - // singleton routes it. The coordinator subscribes to its own incoming topic. - DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentsTopic, ack)); + // No direct coordinator handle — publish on the dedicated ACK topic. The coordinator + // singleton subscribes there in PreStart so the ACK reaches whichever admin node hosts + // it without an actor-path lookup. + DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentAcksTopic, ack)); } } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/DeployHappyPathTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/DeployHappyPathTests.cs new file mode 100644 index 0000000..21d5445 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/DeployHappyPathTests.cs @@ -0,0 +1,115 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; + +namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; + +/// +/// End-to-end deploy: AdminOperationsActor → ConfigPublishCoordinator → DriverHostActor on both +/// nodes → ApplyAck → coordinator seals. Verifies the cluster singleton + DistributedPubSub +/// dispatch path actually works against the real Akka cluster the harness builds. +/// +public sealed class DeployHappyPathTests +{ + private static CancellationToken Ct => TestContext.Current.CancellationToken; + + [Fact] + public async Task StartDeployment_seals_after_both_nodes_apply() + { + await using var harness = await TwoNodeClusterHarness.StartAsync(); + + await using var scope = harness.NodeA.Services.CreateAsyncScope(); + var client = scope.ServiceProvider.GetRequiredService(); + + var result = await client.StartDeploymentAsync(createdBy: "alice@test", Ct); + + result.Outcome.ShouldBe(StartDeploymentOutcome.Accepted); + result.DeploymentId.ShouldNotBeNull(); + var deploymentId = result.DeploymentId!.Value.Value; + + // Wait up to 15s for coordinator to seal after both DriverHostActors ack. + await WaitForAsync(async () => + { + await using var db = await CreateDbAsync(harness); + var d = await db.Deployments.AsNoTracking().FirstOrDefaultAsync(d => d.DeploymentId == deploymentId, Ct); + return d?.Status == DeploymentStatus.Sealed; + }, TimeSpan.FromSeconds(15)); + + await using var db = await CreateDbAsync(harness); + var deployment = await db.Deployments.AsNoTracking() + .FirstAsync(d => d.DeploymentId == deploymentId, Ct); + deployment.Status.ShouldBe(DeploymentStatus.Sealed); + deployment.SealedAtUtc.ShouldNotBeNull(); + + var nodeStates = await db.NodeDeploymentStates.AsNoTracking() + .Where(s => s.DeploymentId == deploymentId) + .ToListAsync(Ct); + nodeStates.Count.ShouldBe(2); + nodeStates.ShouldAllBe(s => s.Status == NodeDeploymentStatus.Applied); + } + + [Fact] + public async Task Replaying_dispatch_to_same_revision_is_idempotent_no_op() + { + await using var harness = await TwoNodeClusterHarness.StartAsync(); + + await using var scope = harness.NodeA.Services.CreateAsyncScope(); + var client = scope.ServiceProvider.GetRequiredService(); + + var first = await client.StartDeploymentAsync(createdBy: "alice@test", Ct); + first.Outcome.ShouldBe(StartDeploymentOutcome.Accepted); + + await WaitForAsync(async () => + { + await using var db = await CreateDbAsync(harness); + var d = await db.Deployments.AsNoTracking() + .FirstOrDefaultAsync(d => d.DeploymentId == first.DeploymentId!.Value.Value, Ct); + return d?.Status == DeploymentStatus.Sealed; + }, TimeSpan.FromSeconds(15)); + + // Same DB state → same revision hash → AdminOperations short-circuits with NoChanges, + // OR (if it accepts) the second dispatch is idempotent (DriverHostActor recognises + // _currentRevision == msg.RevisionHash and acks Applied without re-applying). + var second = await client.StartDeploymentAsync(createdBy: "alice@test", Ct); + + if (second.Outcome == StartDeploymentOutcome.Accepted) + { + // Second deployment row should also seal quickly because both drivers immediate-ack. + await WaitForAsync(async () => + { + await using var db = await CreateDbAsync(harness); + var d = await db.Deployments.AsNoTracking() + .FirstOrDefaultAsync(d => d.DeploymentId == second.DeploymentId!.Value.Value, Ct); + return d?.Status == DeploymentStatus.Sealed; + }, TimeSpan.FromSeconds(15)); + + second.RevisionHash!.Value.Value.ShouldBe(first.RevisionHash!.Value.Value); + } + else + { + second.Outcome.ShouldBeOneOf(StartDeploymentOutcome.NoChanges, StartDeploymentOutcome.AnotherDeploymentInFlight); + } + } + + private static async Task CreateDbAsync(TwoNodeClusterHarness harness) + { + var factory = harness.NodeA.Services.GetRequiredService>(); + return await factory.CreateDbContextAsync(); + } + + private static async Task WaitForAsync(Func> condition, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (DateTime.UtcNow < deadline) + { + if (await condition()) return; + await Task.Delay(200); + } + throw new TimeoutException($"Condition not met within {timeout}"); + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/TwoNodeClusterHarness.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/TwoNodeClusterHarness.cs index 847c03d..222f3bf 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/TwoNodeClusterHarness.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/TwoNodeClusterHarness.cs @@ -43,7 +43,7 @@ namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; public sealed class TwoNodeClusterHarness : IAsyncDisposable { public const string TestRoles = "admin,driver"; - public static readonly string SharedDbName = $"two-node-cluster-{Guid.NewGuid():N}"; + public string SharedDbName { get; } = $"two-node-cluster-{Guid.NewGuid():N}"; public WebApplication NodeA { get; private set; } = null!; public WebApplication NodeB { get; private set; } = null!; @@ -51,6 +51,10 @@ public sealed class TwoNodeClusterHarness : IAsyncDisposable public int NodeAAkkaPort { get; private set; } public int NodeBAkkaPort { get; private set; } + // Both nodes bind to 127.0.0.1 — ClusterRoleInfo + ConfigPublishCoordinator encode + // host:port into NodeId so the cluster membership stays distinct on different ports. + public const string LoopbackHost = "127.0.0.1"; + public ActorSystem NodeASystem => NodeA.Services.GetRequiredService(); public ActorSystem NodeBSystem => NodeB.Services.GetRequiredService(); @@ -63,14 +67,18 @@ public sealed class TwoNodeClusterHarness : IAsyncDisposable // Node A boots first as the seed. harness.NodeA = await BuildNodeAsync( + host: LoopbackHost, akkaPort: harness.NodeAAkkaPort, + seedHost: LoopbackHost, seedAkkaPort: harness.NodeAAkkaPort, - dbName: SharedDbName); + dbName: harness.SharedDbName); harness.NodeB = await BuildNodeAsync( + host: LoopbackHost, akkaPort: harness.NodeBAkkaPort, + seedHost: LoopbackHost, seedAkkaPort: harness.NodeAAkkaPort, - dbName: SharedDbName); + dbName: harness.SharedDbName); await WaitForClusterFormationAsync( harness.NodeASystem, @@ -80,18 +88,19 @@ public sealed class TwoNodeClusterHarness : IAsyncDisposable return harness; } - private static async Task BuildNodeAsync(int akkaPort, int seedAkkaPort, string dbName) + private static async Task BuildNodeAsync( + string host, int akkaPort, string seedHost, int seedAkkaPort, string dbName) { var builder = WebApplication.CreateBuilder(new WebApplicationOptions { Args = [] }); - builder.WebHost.UseKestrel(o => o.Listen(System.Net.IPAddress.Loopback, 0)); + builder.WebHost.UseKestrel(o => o.Listen(System.Net.IPAddress.Parse(host), 0)); builder.Configuration.AddInMemoryCollection(new Dictionary { ["ConnectionStrings:ConfigDb"] = "Server=test;Database=test;Trusted_Connection=True;TrustServerCertificate=True;", - ["Cluster:Hostname"] = "127.0.0.1", + ["Cluster:Hostname"] = host, ["Cluster:Port"] = akkaPort.ToString(), - ["Cluster:PublicHostname"] = "127.0.0.1", - ["Cluster:SeedNodes:0"] = $"akka.tcp://otopcua@127.0.0.1:{seedAkkaPort}", + ["Cluster:PublicHostname"] = host, + ["Cluster:SeedNodes:0"] = $"akka.tcp://otopcua@{seedHost}:{seedAkkaPort}", ["Cluster:Roles:0"] = "admin", ["Cluster:Roles:1"] = "driver", ["Security:Jwt:SigningKey"] = "two-node-harness-test-signing-key-with-enough-bytes-for-hs256", @@ -149,7 +158,7 @@ public sealed class TwoNodeClusterHarness : IAsyncDisposable private static int AllocateFreePort() { - using var listener = new TcpListener(System.Net.IPAddress.Loopback, 0); + using var listener = new TcpListener(System.Net.IPAddress.Parse(LoopbackHost), 0); listener.Start(); var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port; listener.Stop();