test(host): 2-node integration test harness + consolidate to one ActorSystem (Task 58)

Builds TwoNodeClusterHarness: two in-process Host-equivalent nodes sharing
an in-memory ConfigDb. Forms a 2-member Akka cluster. ClusterFormationTests
proves both nodes see each other as admin+driver role members.

Fixes a real production bug uncovered while wiring the harness — Program.cs
ran two separate ActorSystems (one from AddOtOpcUaCluster.AkkaHostedService
with cluster HOCON, one from Akka.Hosting.AddAkka with bare HOCON). Cluster
singletons landed on the bare ActorSystem and could not actually form a
cluster ("Configuration does not contain `akka.cluster` node").

Consolidation:
- AddOtOpcUaCluster now only binds AkkaClusterOptions + registers IClusterRoleInfo
- New WithOtOpcUaClusterBootstrap pushes embedded HOCON + Remote/Cluster options
  into Akka.Hosting's AkkaConfigurationBuilder
- AkkaHostedService.cs deleted — Akka.Hosting now owns the lifecycle
- Program.cs + harness call WithOtOpcUaClusterBootstrap inside AddAkka

Why not WebApplicationFactory<Program>? Program.cs reads OTOPCUA_ROLES from
process env (shared across in-process WAFs); the harness replays Program.cs's
DI graph from a clean WebApplicationBuilder per node with per-node config
overrides. Same production extensions, isolated config + Kestrel + Akka ports.

Tests: 93 v2 tests pass (was 91 + 2 new cluster formation), 0 skipped.
This commit is contained in:
Joseph Doherty
2026-05-26 06:27:04 -04:00
parent bb353c4d43
commit d6fac2d81d
7 changed files with 305 additions and 106 deletions

View File

@@ -62,6 +62,7 @@
</Folder> </Folder>
<Folder Name="/tests/Server/"> <Folder Name="/tests/Server/">
<Project Path="tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.csproj" /> <Project Path="tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.csproj" />
<Project Path="tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests.csproj" />
<Project Path="tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests.csproj" /> <Project Path="tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests.csproj" />
<Project Path="tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ZB.MOM.WW.OtOpcUa.Runtime.Tests.csproj" /> <Project Path="tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ZB.MOM.WW.OtOpcUa.Runtime.Tests.csproj" />
<Project Path="tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests/ZB.MOM.WW.OtOpcUa.Security.Tests.csproj" /> <Project Path="tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests/ZB.MOM.WW.OtOpcUa.Security.Tests.csproj" />

View File

@@ -1,97 +0,0 @@
using Akka.Actor;
using Akka.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ZB.MOM.WW.OtOpcUa.Cluster;
/// <summary>
/// Starts the local <see cref="ActorSystem"/>, applies the embedded HOCON plus an overlay
/// generated from <see cref="AkkaClusterOptions"/>, and joins the cluster. On shutdown,
/// runs <c>CoordinatedShutdown</c> with the <c>ClusterLeavingReason</c> so the local node
/// leaves the cluster cleanly before the process exits.
/// </summary>
public sealed class AkkaHostedService : IHostedService
{
private readonly AkkaClusterOptions _options;
private readonly ILogger<AkkaHostedService> _logger;
private ActorSystem? _actorSystem;
public AkkaHostedService(IOptions<AkkaClusterOptions> options, ILogger<AkkaHostedService> logger)
{
_options = options.Value;
_logger = logger;
}
public ActorSystem ActorSystem =>
_actorSystem ?? throw new InvalidOperationException(
"ActorSystem requested before AkkaHostedService.StartAsync ran.");
public Task StartAsync(CancellationToken cancellationToken)
{
var overlay = BuildOverlay(_options);
var baseConfig = ConfigurationFactory.ParseString(HoconLoader.LoadBaseConfig());
var config = ConfigurationFactory.ParseString(overlay).WithFallback(baseConfig);
_logger.LogInformation(
"Starting ActorSystem '{System}' on {Host}:{Port} with roles=[{Roles}]",
_options.SystemName, _options.PublicHostname, _options.Port,
string.Join(",", _options.Roles));
_actorSystem = ActorSystem.Create(_options.SystemName, config);
if (_options.SeedNodes.Length > 0)
{
var seeds = _options.SeedNodes.Select(Address.Parse).ToList();
Akka.Cluster.Cluster.Get(_actorSystem).JoinSeedNodes(seeds);
}
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_actorSystem is null) return;
_logger.LogInformation("Initiating cluster-leave CoordinatedShutdown");
var shutdown = CoordinatedShutdown.Get(_actorSystem);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromSeconds(30));
try
{
await shutdown.Run(CoordinatedShutdown.ClusterLeavingReason.Instance)
.WaitAsync(cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_logger.LogWarning("Cluster leave timed out after 30s; forcing terminate");
await _actorSystem.Terminate().ConfigureAwait(false);
}
}
private static string BuildOverlay(AkkaClusterOptions o)
{
var seeds = string.Join(",", o.SeedNodes.Select(Quote));
var roles = string.Join(",", o.Roles.Select(Quote));
return $@"
akka {{
remote.dot-netty.tcp {{
hostname = {Quote(o.Hostname)}
port = {o.Port}
public-hostname = {Quote(o.PublicHostname)}
}}
cluster {{
seed-nodes = [{seeds}]
roles = [{roles}]
}}
}}";
}
private static string Quote(string? value)
{
var escaped = (value ?? string.Empty).Replace("\\", "\\\\").Replace("\"", "\\\"");
return $"\"{escaped}\"";
}
}

View File

@@ -1,7 +1,9 @@
using Akka.Actor; using Akka.Cluster.Hosting;
using Akka.Hosting;
using Akka.Remote.Hosting;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Options;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
namespace ZB.MOM.WW.OtOpcUa.Cluster; namespace ZB.MOM.WW.OtOpcUa.Cluster;
@@ -9,20 +11,57 @@ namespace ZB.MOM.WW.OtOpcUa.Cluster;
public static class ServiceCollectionExtensions public static class ServiceCollectionExtensions
{ {
/// <summary> /// <summary>
/// Registers the Akka cluster hosted service and exposes <see cref="ActorSystem"/> and /// Binds <see cref="AkkaClusterOptions"/> and registers <see cref="IClusterRoleInfo"/>. The
/// <see cref="IClusterRoleInfo"/> as singletons resolved from it. Call after binding /// actual ActorSystem + cluster bootstrap is layered on inside the host's <c>AddAkka(...)</c>
/// <c>OTOPCUA_ROLES</c> into <c>AkkaClusterOptions.Roles</c> via the calling Program.cs. /// configurator via <see cref="WithOtOpcUaClusterBootstrap"/> — keeping the entire Akka graph
/// under Akka.Hosting's management so cluster singletons land on the same ActorSystem.
/// </summary> /// </summary>
public static IServiceCollection AddOtOpcUaCluster(this IServiceCollection services, IConfiguration configuration) public static IServiceCollection AddOtOpcUaCluster(this IServiceCollection services, IConfiguration configuration)
{ {
services.AddOptions<AkkaClusterOptions>() services.AddOptions<AkkaClusterOptions>()
.Bind(configuration.GetSection(AkkaClusterOptions.SectionName)); .Bind(configuration.GetSection(AkkaClusterOptions.SectionName));
services.AddSingleton<AkkaHostedService>();
services.AddHostedService(sp => sp.GetRequiredService<AkkaHostedService>());
services.AddSingleton<ActorSystem>(sp => sp.GetRequiredService<AkkaHostedService>().ActorSystem);
services.AddSingleton<IClusterRoleInfo, ClusterRoleInfo>(); services.AddSingleton<IClusterRoleInfo, ClusterRoleInfo>();
return services; return services;
} }
/// <summary>
/// Configures the Akka.Hosting builder with the embedded OtOpcUa HOCON (split-brain resolver,
/// pinned dispatcher, failure detector tuning) + remote endpoint + cluster bootstrap derived
/// from <see cref="AkkaClusterOptions"/>.
///
/// Wire from Program.cs:
/// <code>
/// services.AddAkka("otopcua", (ab, sp) =>
/// {
/// ab.WithOtOpcUaClusterBootstrap(sp);
/// if (hasAdmin) ab.WithOtOpcUaControlPlaneSingletons();
/// if (hasDriver) ab.WithOtOpcUaRuntimeActors();
/// });
/// </code>
/// </summary>
public static AkkaConfigurationBuilder WithOtOpcUaClusterBootstrap(
this AkkaConfigurationBuilder builder,
IServiceProvider serviceProvider)
{
var options = serviceProvider.GetRequiredService<IOptions<AkkaClusterOptions>>().Value;
builder.AddHocon(HoconLoader.LoadBaseConfig(), HoconAddMode.Append);
builder.WithRemoting(new RemoteOptions
{
HostName = options.Hostname,
Port = options.Port,
PublicHostName = options.PublicHostname,
});
builder.WithClustering(new ClusterOptions
{
SeedNodes = options.SeedNodes,
Roles = options.Roles,
});
return builder;
}
} }

View File

@@ -40,8 +40,9 @@ builder.Services.AddOtOpcUaCluster(builder.Configuration);
// Akka cluster bootstrap. Role-specific singletons are registered on the AkkaConfigurationBuilder // Akka cluster bootstrap. Role-specific singletons are registered on the AkkaConfigurationBuilder
// from inside the configurator lambda. AddAkka spins the ActorSystem at host start. // from inside the configurator lambda. AddAkka spins the ActorSystem at host start.
builder.Services.AddAkka("otopcua", (ab, _) => builder.Services.AddAkka("otopcua", (ab, sp) =>
{ {
ab.WithOtOpcUaClusterBootstrap(sp);
if (hasAdmin) if (hasAdmin)
ab.WithOtOpcUaControlPlaneSingletons(); ab.WithOtOpcUaControlPlaneSingletons();
if (hasDriver) if (hasDriver)

View File

@@ -0,0 +1,43 @@
using Akka.Cluster;
using Shouldly;
using Xunit;
namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests;
/// <summary>
/// Smoke test: verifies <see cref="TwoNodeClusterHarness"/> boots two nodes and they form
/// a 2-member cluster with the expected role topology. Failover + deploy scenarios layer
/// on top in Task 59.
/// </summary>
public sealed class ClusterFormationTests
{
[Fact]
public async Task Two_nodes_form_a_2_member_cluster()
{
await using var harness = await TwoNodeClusterHarness.StartAsync();
var aCluster = Akka.Cluster.Cluster.Get(harness.NodeASystem);
var bCluster = Akka.Cluster.Cluster.Get(harness.NodeBSystem);
aCluster.State.Members.Count(m => m.Status == MemberStatus.Up).ShouldBe(2);
bCluster.State.Members.Count(m => m.Status == MemberStatus.Up).ShouldBe(2);
var aRoles = aCluster.State.Members.SelectMany(m => m.Roles).Distinct().ToHashSet();
aRoles.ShouldContain("admin");
aRoles.ShouldContain("driver");
}
[Fact]
public async Task Both_nodes_see_each_other_as_role_members()
{
await using var harness = await TwoNodeClusterHarness.StartAsync();
var aCluster = Akka.Cluster.Cluster.Get(harness.NodeASystem);
aCluster.State.Members
.Where(m => m.Roles.Contains("driver") && m.Status == MemberStatus.Up)
.Count().ShouldBe(2);
aCluster.State.Members
.Where(m => m.Roles.Contains("admin") && m.Status == MemberStatus.Up)
.Count().ShouldBe(2);
}
}

View File

@@ -0,0 +1,176 @@
using System.Net.Sockets;
using Akka.Actor;
using Akka.Cluster;
using Akka.Hosting;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using ZB.MOM.WW.OtOpcUa.AdminUI;
using ZB.MOM.WW.OtOpcUa.AdminUI.Clients;
using ZB.MOM.WW.OtOpcUa.AdminUI.Hubs;
using ZB.MOM.WW.OtOpcUa.Cluster;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.ControlPlane;
using ZB.MOM.WW.OtOpcUa.Host.Health;
using ZB.MOM.WW.OtOpcUa.Runtime;
using ZB.MOM.WW.OtOpcUa.Security;
using ZB.MOM.WW.OtOpcUa.Security.Endpoints;
using ZB.MOM.WW.OtOpcUa.Security.Ldap;
namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests;
/// <summary>
/// Spins up two in-process <c>OtOpcUa.Host</c>-equivalent <see cref="WebApplication"/> instances
/// that share an in-memory <see cref="OtOpcUaConfigDbContext"/> and form a 2-member Akka cluster.
/// Both nodes carry the <c>admin</c> + <c>driver</c> roles, matching design §8's failover-test
/// 2-node profile.
///
/// Why not <c>WebApplicationFactory&lt;Program&gt;</c>?
/// Program.cs reads <c>OTOPCUA_ROLES</c> from process env (shared across in-process WAF
/// instances) and writes both Serilog file sinks + Akka cluster TCP listener to the host
/// process — neither survives two parallel WAFs cleanly. This harness instead replays the
/// Program.cs DI graph from a clean <see cref="WebApplicationBuilder"/> per node with
/// per-node config overrides. The production wiring is the same set of extensions
/// (<see cref="ServiceCollectionExtensions.AddOtOpcUaConfigDb"/>,
/// <see cref="AkkaCluster.ServiceCollectionExtensions.AddOtOpcUaCluster"/>,
/// <see cref="AddOtOpcUaAuth"/>, <see cref="AddOtOpcUaHealth"/>,
/// <see cref="WithOtOpcUaControlPlaneSingletons"/>,
/// <see cref="WithOtOpcUaRuntimeActors"/>).
/// </summary>
public sealed class TwoNodeClusterHarness : IAsyncDisposable
{
public const string TestRoles = "admin,driver";
public static readonly string SharedDbName = $"two-node-cluster-{Guid.NewGuid():N}";
public WebApplication NodeA { get; private set; } = null!;
public WebApplication NodeB { get; private set; } = null!;
public int NodeAAkkaPort { get; private set; }
public int NodeBAkkaPort { get; private set; }
public ActorSystem NodeASystem => NodeA.Services.GetRequiredService<ActorSystem>();
public ActorSystem NodeBSystem => NodeB.Services.GetRequiredService<ActorSystem>();
/// <summary>Boots both nodes and waits up to <paramref name="formationTimeout"/> for cluster convergence.</summary>
public static async Task<TwoNodeClusterHarness> StartAsync(TimeSpan? formationTimeout = null)
{
var harness = new TwoNodeClusterHarness();
harness.NodeAAkkaPort = AllocateFreePort();
harness.NodeBAkkaPort = AllocateFreePort();
// Node A boots first as the seed.
harness.NodeA = await BuildNodeAsync(
akkaPort: harness.NodeAAkkaPort,
seedAkkaPort: harness.NodeAAkkaPort,
dbName: SharedDbName);
harness.NodeB = await BuildNodeAsync(
akkaPort: harness.NodeBAkkaPort,
seedAkkaPort: harness.NodeAAkkaPort,
dbName: SharedDbName);
await WaitForClusterFormationAsync(
harness.NodeASystem,
harness.NodeBSystem,
formationTimeout ?? TimeSpan.FromSeconds(20));
return harness;
}
private static async Task<WebApplication> BuildNodeAsync(int akkaPort, int seedAkkaPort, string dbName)
{
var builder = WebApplication.CreateBuilder(new WebApplicationOptions { Args = [] });
builder.WebHost.UseKestrel(o => o.Listen(System.Net.IPAddress.Loopback, 0));
builder.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
{
["ConnectionStrings:ConfigDb"] = "Server=test;Database=test;Trusted_Connection=True;TrustServerCertificate=True;",
["Cluster:Hostname"] = "127.0.0.1",
["Cluster:Port"] = akkaPort.ToString(),
["Cluster:PublicHostname"] = "127.0.0.1",
["Cluster:SeedNodes:0"] = $"akka.tcp://otopcua@127.0.0.1:{seedAkkaPort}",
["Cluster:Roles:0"] = "admin",
["Cluster:Roles:1"] = "driver",
["Security:Jwt:SigningKey"] = "two-node-harness-test-signing-key-with-enough-bytes-for-hs256",
["Security:Jwt:Issuer"] = "otopcua-test",
["Security:Jwt:Audience"] = "otopcua-test",
});
// Replicate Program.cs role wiring with the harness-shared in-memory ConfigDb.
builder.Services.AddDbContextFactory<OtOpcUaConfigDbContext>(opt => opt.UseInMemoryDatabase(dbName));
builder.Services.AddDbContext<OtOpcUaConfigDbContext>(opt => opt.UseInMemoryDatabase(dbName));
builder.Services.AddOtOpcUaCluster(builder.Configuration);
builder.Services.AddAkka("otopcua", (ab, sp) =>
{
ab.WithOtOpcUaClusterBootstrap(sp);
ab.WithOtOpcUaControlPlaneSingletons();
ab.WithOtOpcUaRuntimeActors();
});
builder.Services.AddOtOpcUaAuth(builder.Configuration);
builder.Services.AddSingleton<ILdapAuthService, StubLdapAuthService>();
builder.Services.AddAdminUI();
builder.Services.AddSignalR();
builder.Services.AddOtOpcUaAdminClients();
builder.Services.AddOtOpcUaHealth();
var app = builder.Build();
app.UseAuthentication();
app.UseAuthorization();
app.MapOtOpcUaAuth();
app.MapOtOpcUaHubs();
app.MapOtOpcUaHealth();
await app.StartAsync();
return app;
}
private static async Task WaitForClusterFormationAsync(ActorSystem a, ActorSystem b, TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;
while (DateTime.UtcNow < deadline)
{
var aMembers = Akka.Cluster.Cluster.Get(a).State.Members
.Where(m => m.Status == MemberStatus.Up).ToArray();
var bMembers = Akka.Cluster.Cluster.Get(b).State.Members
.Where(m => m.Status == MemberStatus.Up).ToArray();
if (aMembers.Length >= 2 && bMembers.Length >= 2) return;
await Task.Delay(200);
}
throw new TimeoutException(
$"Cluster did not form within {timeout}. " +
$"A up={Akka.Cluster.Cluster.Get(a).State.Members.Count(m => m.Status == MemberStatus.Up)}, " +
$"B up={Akka.Cluster.Cluster.Get(b).State.Members.Count(m => m.Status == MemberStatus.Up)}");
}
private static int AllocateFreePort()
{
using var listener = new TcpListener(System.Net.IPAddress.Loopback, 0);
listener.Start();
var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port;
listener.Stop();
return port;
}
public async ValueTask DisposeAsync()
{
if (NodeB is not null) await NodeB.DisposeAsync();
if (NodeA is not null) await NodeA.DisposeAsync();
}
private sealed class StubLdapAuthService : ILdapAuthService
{
public Task<LdapAuthResult> AuthenticateAsync(string username, string password, CancellationToken ct = default)
=> Task.FromResult(new LdapAuthResult(
Success: password == "valid-password",
DisplayName: username,
Username: username,
Groups: ["FleetAdmin"],
Roles: ["FleetAdmin"],
Error: null));
}
}

View File

@@ -0,0 +1,36 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
<RootNamespace>ZB.MOM.WW.OtOpcUa.Host.IntegrationTests</RootNamespace>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="xunit.v3"/>
<PackageReference Include="Shouldly"/>
<PackageReference Include="Microsoft.NET.Test.Sdk"/>
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing"/>
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory"/>
<PackageReference Include="Akka.Hosting"/>
<PackageReference Include="xunit.runner.visualstudio">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Server\ZB.MOM.WW.OtOpcUa.Host\ZB.MOM.WW.OtOpcUa.Host.csproj"/>
<ProjectReference Include="..\..\..\src\Core\ZB.MOM.WW.OtOpcUa.Cluster\ZB.MOM.WW.OtOpcUa.Cluster.csproj"/>
<ProjectReference Include="..\..\..\src\Core\ZB.MOM.WW.OtOpcUa.Configuration\ZB.MOM.WW.OtOpcUa.Configuration.csproj"/>
<ProjectReference Include="..\..\..\src\Core\ZB.MOM.WW.OtOpcUa.Commons\ZB.MOM.WW.OtOpcUa.Commons.csproj"/>
<ProjectReference Include="..\..\..\src\Server\ZB.MOM.WW.OtOpcUa.Security\ZB.MOM.WW.OtOpcUa.Security.csproj"/>
</ItemGroup>
<ItemGroup>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-g94r-2vxg-569j"/>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-h958-fxgg-g7w3"/>
</ItemGroup>
</Project>