test(host): deploy happy-path + idempotency integration tests (Task 59)

DeployHappyPathTests exercises the full deploy pipeline on the 2-node harness:
AdminOperationsActor → ConfigPublishCoordinator → DistributedPubSub →
DriverHostActor on both nodes → ApplyAck → coordinator seals. Verifies both
NodeDeploymentState rows reach Applied and Deployment.Status reaches Sealed.

Exposed + fixed two production bugs along the way:

1. Coordinator was publishing DispatchDeployment on the "deployments" topic but
   never subscribed to anything — DriverHostActor ACKs published on the same
   topic could not reach it. Added dedicated "deployment-acks" topic with
   coordinator subscription in PreStart, and DriverHostActor publishes ACKs
   there.

2. NodeId derivation used member.Address.Host only — two cluster members on a
   shared loopback host (test harness, dev VMs) collided to one identity. The
   coordinator's expected-ack set became {1} and the system sealed after only
   half the nodes acked. Switched to host:port everywhere (ClusterRoleInfo +
   coordinator) so loopback nodes stay distinct and production identities are
   harmlessly more specific.

Tests: 95 v2 tests pass (was 93 + 2 deploy tests), 0 skipped.

Failover scenarios (design §8 cases 3-7: node-kill-mid-apply, split-brain,
restart-during-deploy) deferred — they need controlled node-down primitives
on the harness. Tracked as F22 (failover scenario test cases).
This commit is contained in:
Joseph Doherty
2026-05-26 06:34:36 -04:00
parent 62e3cd6599
commit 5cfbe8b5dd
5 changed files with 158 additions and 18 deletions

View File

@@ -29,7 +29,10 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable
{ {
_cluster = Akka.Cluster.Cluster.Get(system); _cluster = Akka.Cluster.Cluster.Get(system);
_logger = logger; _logger = logger;
_localNode = CommonsNodeId.Parse(options.Value.PublicHostname); // NodeId encodes host:port so cluster members on shared hosts (test loopback, dev VMs
// sharing a bind IP) stay distinct. Production hosts have unique DNS names so the port
// suffix is harmless redundancy.
_localNode = CommonsNodeId.Parse($"{options.Value.PublicHostname}:{options.Value.Port}");
_localRoles = new HashSet<string>(options.Value.Roles, StringComparer.Ordinal); _localRoles = new HashSet<string>(options.Value.Roles, StringComparer.Ordinal);
SeedFromCurrentState(); SeedFromCurrentState();
@@ -48,7 +51,7 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable
{ {
if (!_membersByRole.TryGetValue(role, out var members)) return Array.Empty<CommonsNodeId>(); if (!_membersByRole.TryGetValue(role, out var members)) return Array.Empty<CommonsNodeId>();
return members return members
.Select(m => CommonsNodeId.Parse(m.Address.Host ?? string.Empty)) .Select(m => ToNodeId(m.Address))
.ToArray(); .ToArray();
} }
} }
@@ -58,7 +61,7 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable
lock (_lock) lock (_lock)
{ {
return _roleLeaders.TryGetValue(role, out var leader) && leader is not null return _roleLeaders.TryGetValue(role, out var leader) && leader is not null
? CommonsNodeId.Parse(leader.Address.Host ?? string.Empty) ? ToNodeId(leader.Address)
: (CommonsNodeId?)null; : (CommonsNodeId?)null;
} }
} }
@@ -121,7 +124,7 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable
{ {
_roleLeaders.TryGetValue(evt.Role, out var prevMember); _roleLeaders.TryGetValue(evt.Role, out var prevMember);
if (prevMember is not null) if (prevMember is not null)
previous = CommonsNodeId.Parse(prevMember.Address.Host ?? string.Empty); previous = ToNodeId(prevMember.Address);
var nextMember = evt.Leader is null var nextMember = evt.Leader is null
? null ? null
@@ -129,7 +132,7 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable
_roleLeaders[evt.Role] = nextMember; _roleLeaders[evt.Role] = nextMember;
if (nextMember is not null) if (nextMember is not null)
next = CommonsNodeId.Parse(nextMember.Address.Host ?? string.Empty); next = ToNodeId(nextMember.Address);
raise = !Nullable.Equals(previous, next); raise = !Nullable.Equals(previous, next);
} }
@@ -150,6 +153,9 @@ public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable
} }
} }
private static CommonsNodeId ToNodeId(Akka.Actor.Address address) =>
CommonsNodeId.Parse($"{address.Host ?? string.Empty}:{address.Port ?? 0}");
public void Dispose() public void Dispose()
{ {
_subscriber?.Tell(PoisonPill.Instance); _subscriber?.Tell(PoisonPill.Instance);

View File

@@ -24,6 +24,7 @@ namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Coordinators;
public sealed class ConfigPublishCoordinator : ReceiveActor, IWithTimers public sealed class ConfigPublishCoordinator : ReceiveActor, IWithTimers
{ {
public const string DeploymentsTopic = "deployments"; public const string DeploymentsTopic = "deployments";
public const string DeploymentAcksTopic = "deployment-acks";
public static readonly TimeSpan DefaultApplyDeadline = TimeSpan.FromMinutes(2); public static readonly TimeSpan DefaultApplyDeadline = TimeSpan.FromMinutes(2);
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory; private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
@@ -50,6 +51,7 @@ public sealed class ConfigPublishCoordinator : ReceiveActor, IWithTimers
Receive<DispatchDeployment>(HandleDispatch); Receive<DispatchDeployment>(HandleDispatch);
Receive<ApplyAck>(HandleAck); Receive<ApplyAck>(HandleAck);
Receive<DeadlineElapsed>(HandleDeadline); Receive<DeadlineElapsed>(HandleDeadline);
Receive<SubscribeAck>(_ => { /* DPS subscribe confirmation */ });
} }
/// <summary> /// <summary>
@@ -59,6 +61,10 @@ public sealed class ConfigPublishCoordinator : ReceiveActor, IWithTimers
/// </summary> /// </summary>
protected override void PreStart() protected override void PreStart()
{ {
// Subscribe to per-node ApplyAck broadcasts so DriverHostActors on remote members can
// route their ACKs to whichever node currently hosts this singleton.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentAcksTopic, Self));
using var db = _dbFactory.CreateDbContext(); using var db = _dbFactory.CreateDbContext();
var inflight = db.Deployments var inflight = db.Deployments
.Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks) .Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks)
@@ -239,7 +245,9 @@ public sealed class ConfigPublishCoordinator : ReceiveActor, IWithTimers
if (!member.Roles.Contains("driver")) continue; if (!member.Roles.Contains("driver")) continue;
var host = member.Address.Host; var host = member.Address.Host;
if (string.IsNullOrWhiteSpace(host)) continue; if (string.IsNullOrWhiteSpace(host)) continue;
nodes.Add(NodeId.Parse(host)); // Match ClusterRoleInfo's NodeId derivation (host:port) so DriverHostActor's
// self-identification and the coordinator's expected-ack set agree.
nodes.Add(NodeId.Parse($"{host}:{member.Address.Port ?? 0}"));
} }
return nodes; return nodes;
} }

View File

@@ -30,6 +30,7 @@ namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
public sealed class DriverHostActor : ReceiveActor, IWithTimers public sealed class DriverHostActor : ReceiveActor, IWithTimers
{ {
public const string DeploymentsTopic = "deployments"; public const string DeploymentsTopic = "deployments";
public const string DeploymentAcksTopic = "deployment-acks";
public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30); public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30);
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory; private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
@@ -276,9 +277,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
} }
else else
{ {
// No direct coordinator handle — publish back through DistributedPubSub so the // No direct coordinator handle — publish on the dedicated ACK topic. The coordinator
// singleton routes it. The coordinator subscribes to its own incoming topic. // singleton subscribes there in PreStart so the ACK reaches whichever admin node hosts
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentsTopic, ack)); // it without an actor-path lookup.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentAcksTopic, ack));
} }
} }
} }

View File

@@ -0,0 +1,115 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests;
/// <summary>
/// End-to-end deploy: AdminOperationsActor → ConfigPublishCoordinator → DriverHostActor on both
/// nodes → ApplyAck → coordinator seals. Verifies the cluster singleton + DistributedPubSub
/// dispatch path actually works against the real Akka cluster the harness builds.
/// </summary>
public sealed class DeployHappyPathTests
{
private static CancellationToken Ct => TestContext.Current.CancellationToken;
[Fact]
public async Task StartDeployment_seals_after_both_nodes_apply()
{
await using var harness = await TwoNodeClusterHarness.StartAsync();
await using var scope = harness.NodeA.Services.CreateAsyncScope();
var client = scope.ServiceProvider.GetRequiredService<IAdminOperationsClient>();
var result = await client.StartDeploymentAsync(createdBy: "alice@test", Ct);
result.Outcome.ShouldBe(StartDeploymentOutcome.Accepted);
result.DeploymentId.ShouldNotBeNull();
var deploymentId = result.DeploymentId!.Value.Value;
// Wait up to 15s for coordinator to seal after both DriverHostActors ack.
await WaitForAsync(async () =>
{
await using var db = await CreateDbAsync(harness);
var d = await db.Deployments.AsNoTracking().FirstOrDefaultAsync(d => d.DeploymentId == deploymentId, Ct);
return d?.Status == DeploymentStatus.Sealed;
}, TimeSpan.FromSeconds(15));
await using var db = await CreateDbAsync(harness);
var deployment = await db.Deployments.AsNoTracking()
.FirstAsync(d => d.DeploymentId == deploymentId, Ct);
deployment.Status.ShouldBe(DeploymentStatus.Sealed);
deployment.SealedAtUtc.ShouldNotBeNull();
var nodeStates = await db.NodeDeploymentStates.AsNoTracking()
.Where(s => s.DeploymentId == deploymentId)
.ToListAsync(Ct);
nodeStates.Count.ShouldBe(2);
nodeStates.ShouldAllBe(s => s.Status == NodeDeploymentStatus.Applied);
}
[Fact]
public async Task Replaying_dispatch_to_same_revision_is_idempotent_no_op()
{
await using var harness = await TwoNodeClusterHarness.StartAsync();
await using var scope = harness.NodeA.Services.CreateAsyncScope();
var client = scope.ServiceProvider.GetRequiredService<IAdminOperationsClient>();
var first = await client.StartDeploymentAsync(createdBy: "alice@test", Ct);
first.Outcome.ShouldBe(StartDeploymentOutcome.Accepted);
await WaitForAsync(async () =>
{
await using var db = await CreateDbAsync(harness);
var d = await db.Deployments.AsNoTracking()
.FirstOrDefaultAsync(d => d.DeploymentId == first.DeploymentId!.Value.Value, Ct);
return d?.Status == DeploymentStatus.Sealed;
}, TimeSpan.FromSeconds(15));
// Same DB state → same revision hash → AdminOperations short-circuits with NoChanges,
// OR (if it accepts) the second dispatch is idempotent (DriverHostActor recognises
// _currentRevision == msg.RevisionHash and acks Applied without re-applying).
var second = await client.StartDeploymentAsync(createdBy: "alice@test", Ct);
if (second.Outcome == StartDeploymentOutcome.Accepted)
{
// Second deployment row should also seal quickly because both drivers immediate-ack.
await WaitForAsync(async () =>
{
await using var db = await CreateDbAsync(harness);
var d = await db.Deployments.AsNoTracking()
.FirstOrDefaultAsync(d => d.DeploymentId == second.DeploymentId!.Value.Value, Ct);
return d?.Status == DeploymentStatus.Sealed;
}, TimeSpan.FromSeconds(15));
second.RevisionHash!.Value.Value.ShouldBe(first.RevisionHash!.Value.Value);
}
else
{
second.Outcome.ShouldBeOneOf(StartDeploymentOutcome.NoChanges, StartDeploymentOutcome.AnotherDeploymentInFlight);
}
}
private static async Task<OtOpcUaConfigDbContext> CreateDbAsync(TwoNodeClusterHarness harness)
{
var factory = harness.NodeA.Services.GetRequiredService<IDbContextFactory<OtOpcUaConfigDbContext>>();
return await factory.CreateDbContextAsync();
}
private static async Task WaitForAsync(Func<Task<bool>> condition, TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;
while (DateTime.UtcNow < deadline)
{
if (await condition()) return;
await Task.Delay(200);
}
throw new TimeoutException($"Condition not met within {timeout}");
}
}

View File

@@ -43,7 +43,7 @@ namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests;
public sealed class TwoNodeClusterHarness : IAsyncDisposable public sealed class TwoNodeClusterHarness : IAsyncDisposable
{ {
public const string TestRoles = "admin,driver"; public const string TestRoles = "admin,driver";
public static readonly string SharedDbName = $"two-node-cluster-{Guid.NewGuid():N}"; public string SharedDbName { get; } = $"two-node-cluster-{Guid.NewGuid():N}";
public WebApplication NodeA { get; private set; } = null!; public WebApplication NodeA { get; private set; } = null!;
public WebApplication NodeB { get; private set; } = null!; public WebApplication NodeB { get; private set; } = null!;
@@ -51,6 +51,10 @@ public sealed class TwoNodeClusterHarness : IAsyncDisposable
public int NodeAAkkaPort { get; private set; } public int NodeAAkkaPort { get; private set; }
public int NodeBAkkaPort { get; private set; } public int NodeBAkkaPort { get; private set; }
// Both nodes bind to 127.0.0.1 — ClusterRoleInfo + ConfigPublishCoordinator encode
// host:port into NodeId so the cluster membership stays distinct on different ports.
public const string LoopbackHost = "127.0.0.1";
public ActorSystem NodeASystem => NodeA.Services.GetRequiredService<ActorSystem>(); public ActorSystem NodeASystem => NodeA.Services.GetRequiredService<ActorSystem>();
public ActorSystem NodeBSystem => NodeB.Services.GetRequiredService<ActorSystem>(); public ActorSystem NodeBSystem => NodeB.Services.GetRequiredService<ActorSystem>();
@@ -63,14 +67,18 @@ public sealed class TwoNodeClusterHarness : IAsyncDisposable
// Node A boots first as the seed. // Node A boots first as the seed.
harness.NodeA = await BuildNodeAsync( harness.NodeA = await BuildNodeAsync(
host: LoopbackHost,
akkaPort: harness.NodeAAkkaPort, akkaPort: harness.NodeAAkkaPort,
seedHost: LoopbackHost,
seedAkkaPort: harness.NodeAAkkaPort, seedAkkaPort: harness.NodeAAkkaPort,
dbName: SharedDbName); dbName: harness.SharedDbName);
harness.NodeB = await BuildNodeAsync( harness.NodeB = await BuildNodeAsync(
host: LoopbackHost,
akkaPort: harness.NodeBAkkaPort, akkaPort: harness.NodeBAkkaPort,
seedHost: LoopbackHost,
seedAkkaPort: harness.NodeAAkkaPort, seedAkkaPort: harness.NodeAAkkaPort,
dbName: SharedDbName); dbName: harness.SharedDbName);
await WaitForClusterFormationAsync( await WaitForClusterFormationAsync(
harness.NodeASystem, harness.NodeASystem,
@@ -80,18 +88,19 @@ public sealed class TwoNodeClusterHarness : IAsyncDisposable
return harness; return harness;
} }
private static async Task<WebApplication> BuildNodeAsync(int akkaPort, int seedAkkaPort, string dbName) private static async Task<WebApplication> BuildNodeAsync(
string host, int akkaPort, string seedHost, int seedAkkaPort, string dbName)
{ {
var builder = WebApplication.CreateBuilder(new WebApplicationOptions { Args = [] }); var builder = WebApplication.CreateBuilder(new WebApplicationOptions { Args = [] });
builder.WebHost.UseKestrel(o => o.Listen(System.Net.IPAddress.Loopback, 0)); builder.WebHost.UseKestrel(o => o.Listen(System.Net.IPAddress.Parse(host), 0));
builder.Configuration.AddInMemoryCollection(new Dictionary<string, string?> builder.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
{ {
["ConnectionStrings:ConfigDb"] = "Server=test;Database=test;Trusted_Connection=True;TrustServerCertificate=True;", ["ConnectionStrings:ConfigDb"] = "Server=test;Database=test;Trusted_Connection=True;TrustServerCertificate=True;",
["Cluster:Hostname"] = "127.0.0.1", ["Cluster:Hostname"] = host,
["Cluster:Port"] = akkaPort.ToString(), ["Cluster:Port"] = akkaPort.ToString(),
["Cluster:PublicHostname"] = "127.0.0.1", ["Cluster:PublicHostname"] = host,
["Cluster:SeedNodes:0"] = $"akka.tcp://otopcua@127.0.0.1:{seedAkkaPort}", ["Cluster:SeedNodes:0"] = $"akka.tcp://otopcua@{seedHost}:{seedAkkaPort}",
["Cluster:Roles:0"] = "admin", ["Cluster:Roles:0"] = "admin",
["Cluster:Roles:1"] = "driver", ["Cluster:Roles:1"] = "driver",
["Security:Jwt:SigningKey"] = "two-node-harness-test-signing-key-with-enough-bytes-for-hs256", ["Security:Jwt:SigningKey"] = "two-node-harness-test-signing-key-with-enough-bytes-for-hs256",
@@ -149,7 +158,7 @@ public sealed class TwoNodeClusterHarness : IAsyncDisposable
private static int AllocateFreePort() private static int AllocateFreePort()
{ {
using var listener = new TcpListener(System.Net.IPAddress.Loopback, 0); using var listener = new TcpListener(System.Net.IPAddress.Parse(LoopbackHost), 0);
listener.Start(); listener.Start();
var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port; var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port;
listener.Stop(); listener.Stop();