using System.Net.Sockets; using Akka.Actor; using Akka.Cluster; using Akka.Hosting; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using ZB.MOM.WW.OtOpcUa.AdminUI; using ZB.MOM.WW.OtOpcUa.AdminUI.Clients; using ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; using ZB.MOM.WW.OtOpcUa.Cluster; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.ControlPlane; using ZB.MOM.WW.OtOpcUa.Host.Health; using ZB.MOM.WW.OtOpcUa.Runtime; using ZB.MOM.WW.OtOpcUa.Security; using ZB.MOM.WW.OtOpcUa.Security.Endpoints; using ZB.MOM.WW.OtOpcUa.Security.Ldap; namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; /// /// Spins up two in-process OtOpcUa.Host-equivalent instances /// that share an and form a 2-member Akka cluster. Both /// nodes carry the admin + driver roles, matching design §8's failover-test 2-node /// profile. /// /// Default mode uses EF InMemoryDatabase + . Optional /// real-infra modes (env-var driven, see ): /// /// OTOPCUA_HARNESS_USE_SQL=1 → swap the in-memory DB for SQL Server on /// localhost:14331 (see docker-compose.yml). Each harness gets a unique /// database name (OtOpcUa_Harness_{guid}) created via EnsureCreated /// and dropped via EnsureDeleted on dispose. /// OTOPCUA_HARNESS_USE_LDAP=1 → drop the stub and point LdapAuthService /// at OpenLDAP on localhost:3894. /// /// /// Why not WebApplicationFactory<Program>? Program.cs reads OTOPCUA_ROLES /// from process env (shared across in-process WAF instances) and writes both Serilog file sinks /// + Akka cluster TCP listener to the host process — neither survives two parallel WAFs cleanly. /// This harness instead replays the Program.cs DI graph from a clean /// per node with per-node config overrides. /// public sealed class TwoNodeClusterHarness : IAsyncDisposable { public const string TestRoles = "admin,driver"; public string SharedDbName { get; } = $"two-node-cluster-{Guid.NewGuid():N}"; public HarnessMode Mode { get; } = HarnessMode.FromEnvironment(); private string? _sqlDbName; private string? _sqlConnString; public WebApplication NodeA { get; private set; } = null!; public WebApplication NodeB { get; private set; } = null!; public int NodeAAkkaPort { get; private set; } public int NodeBAkkaPort { get; private set; } // Both nodes bind to 127.0.0.1 — ClusterRoleInfo + ConfigPublishCoordinator encode // host:port into NodeId so the cluster membership stays distinct on different ports. public const string LoopbackHost = "127.0.0.1"; public ActorSystem NodeASystem => NodeA.Services.GetRequiredService(); public ActorSystem NodeBSystem => NodeB.Services.GetRequiredService(); /// Boots both nodes and waits up to for cluster convergence. public static async Task StartAsync(TimeSpan? formationTimeout = null) { var harness = new TwoNodeClusterHarness(); harness.NodeAAkkaPort = AllocateFreePort(); harness.NodeBAkkaPort = AllocateFreePort(); if (harness.Mode.UseSqlServer) { harness._sqlDbName = $"OtOpcUa_Harness_{Guid.NewGuid():N}"; harness._sqlConnString = $"Server=localhost,14331;Database={harness._sqlDbName};User Id=sa;Password=OtOpcUa!Harness123;TrustServerCertificate=True;"; await EnsureSqlSchemaCreatedAsync(harness._sqlConnString); } // Node A boots first as the seed. harness.NodeA = await BuildNodeAsync(harness, NodeRole.Seed); harness.NodeB = await BuildNodeAsync(harness, NodeRole.Joiner); await WaitForClusterFormationAsync( harness.NodeASystem, harness.NodeBSystem, formationTimeout ?? TimeSpan.FromSeconds(20)); return harness; } /// /// Gracefully shuts down node B via , which runs /// CoordinatedShutdown → Cluster.Leave. Node A sees the member transition to Removed within /// a couple of seconds. Use this for failover scenarios; call /// to bring it back on the same Akka port. /// public async Task StopNodeBAsync() { if (NodeB is null) return; await NodeB.DisposeAsync(); NodeB = null!; } /// /// Rebuilds node B on the same Akka port + same ConfigDb and waits for the cluster /// to re-converge to 2 Up members. Use after to test rejoin. /// public async Task RestartNodeBAsync(TimeSpan? formationTimeout = null) { NodeB = await BuildNodeAsync(this, NodeRole.Joiner); await WaitForClusterFormationAsync( NodeASystem, NodeBSystem, formationTimeout ?? TimeSpan.FromSeconds(20)); } /// /// Waits for node A's cluster view to reach members in /// . Used for asserting shrink-after-stop or grow-after-restart. /// public async Task WaitForClusterSizeAsync(int expectedUpMembers, TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { var count = Akka.Cluster.Cluster.Get(NodeASystem).State.Members .Count(m => m.Status == MemberStatus.Up); if (count == expectedUpMembers) return; await Task.Delay(200); } var actual = Akka.Cluster.Cluster.Get(NodeASystem).State.Members .Count(m => m.Status == MemberStatus.Up); throw new TimeoutException( $"Cluster did not converge to {expectedUpMembers} Up members within {timeout}. Actual={actual}"); } private enum NodeRole { Seed, Joiner } private static async Task BuildNodeAsync(TwoNodeClusterHarness harness, NodeRole role) { var akkaPort = role == NodeRole.Seed ? harness.NodeAAkkaPort : harness.NodeBAkkaPort; var builder = WebApplication.CreateBuilder(new WebApplicationOptions { Args = [] }); builder.WebHost.UseKestrel(o => o.Listen(System.Net.IPAddress.Parse(LoopbackHost), 0)); var configOverrides = new Dictionary { ["ConnectionStrings:ConfigDb"] = harness._sqlConnString ?? "Server=test;Database=test;Trusted_Connection=True;TrustServerCertificate=True;", ["Cluster:Hostname"] = LoopbackHost, ["Cluster:Port"] = akkaPort.ToString(), ["Cluster:PublicHostname"] = LoopbackHost, ["Cluster:SeedNodes:0"] = $"akka.tcp://otopcua@{LoopbackHost}:{harness.NodeAAkkaPort}", ["Cluster:Roles:0"] = "admin", ["Cluster:Roles:1"] = "driver", ["Security:Jwt:SigningKey"] = "two-node-harness-test-signing-key-with-enough-bytes-for-hs256", ["Security:Jwt:Issuer"] = "otopcua-test", ["Security:Jwt:Audience"] = "otopcua-test", }; if (harness.Mode.UseRealLdap) { configOverrides["Authentication:Ldap:Enabled"] = "true"; configOverrides["Authentication:Ldap:Server"] = "localhost"; configOverrides["Authentication:Ldap:Port"] = "3894"; configOverrides["Authentication:Ldap:UseTls"] = "false"; configOverrides["Authentication:Ldap:AllowInsecureLdap"] = "true"; configOverrides["Authentication:Ldap:SearchBase"] = "dc=lmxopcua,dc=local"; configOverrides["Authentication:Ldap:ServiceAccountDn"] = "cn=admin,dc=lmxopcua,dc=local"; configOverrides["Authentication:Ldap:ServiceAccountPassword"] = "ldapadmin"; } builder.Configuration.AddInMemoryCollection(configOverrides); // Provider swap: same DbContext type wired to either InMemory or SqlServer at startup. if (harness.Mode.UseSqlServer) { builder.Services.AddDbContextFactory(opt => opt.UseSqlServer(harness._sqlConnString)); builder.Services.AddDbContext(opt => opt.UseSqlServer(harness._sqlConnString)); } else { builder.Services.AddDbContextFactory(opt => opt.UseInMemoryDatabase(harness.SharedDbName)); builder.Services.AddDbContext(opt => opt.UseInMemoryDatabase(harness.SharedDbName)); } builder.Services.AddOtOpcUaCluster(builder.Configuration); builder.Services.AddAkka("otopcua", (ab, sp) => { ab.WithOtOpcUaClusterBootstrap(sp); ab.WithOtOpcUaControlPlaneSingletons(); ab.WithOtOpcUaRuntimeActors(); }); builder.Services.AddOtOpcUaAuth(builder.Configuration); if (!harness.Mode.UseRealLdap) builder.Services.AddSingleton(); builder.Services.AddAdminUI(); builder.Services.AddSignalR(); builder.Services.AddOtOpcUaAdminClients(); builder.Services.AddOtOpcUaHealth(); var app = builder.Build(); app.UseAuthentication(); app.UseAuthorization(); app.MapOtOpcUaAuth(); app.MapOtOpcUaHubs(); app.MapOtOpcUaHealth(); await app.StartAsync(); return app; } private static async Task EnsureSqlSchemaCreatedAsync(string connString) { var opts = new DbContextOptionsBuilder() .UseSqlServer(connString) .Options; await using var db = new OtOpcUaConfigDbContext(opts); // EnsureCreated bypasses migrations but builds the model in one shot — fine for tests. // Production deployments use Migrate-To-V2.ps1 to apply EF migrations. await db.Database.EnsureCreatedAsync(); } private static async Task DropSqlDatabaseAsync(string connString) { var opts = new DbContextOptionsBuilder() .UseSqlServer(connString) .Options; await using var db = new OtOpcUaConfigDbContext(opts); await db.Database.EnsureDeletedAsync(); } private static async Task WaitForClusterFormationAsync(ActorSystem a, ActorSystem b, TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { var aMembers = Akka.Cluster.Cluster.Get(a).State.Members .Where(m => m.Status == MemberStatus.Up).ToArray(); var bMembers = Akka.Cluster.Cluster.Get(b).State.Members .Where(m => m.Status == MemberStatus.Up).ToArray(); if (aMembers.Length >= 2 && bMembers.Length >= 2) return; await Task.Delay(200); } throw new TimeoutException( $"Cluster did not form within {timeout}. " + $"A up={Akka.Cluster.Cluster.Get(a).State.Members.Count(m => m.Status == MemberStatus.Up)}, " + $"B up={Akka.Cluster.Cluster.Get(b).State.Members.Count(m => m.Status == MemberStatus.Up)}"); } private static int AllocateFreePort() { using var listener = new TcpListener(System.Net.IPAddress.Parse(LoopbackHost), 0); listener.Start(); var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port; listener.Stop(); return port; } public async ValueTask DisposeAsync() { if (NodeB is not null) await NodeB.DisposeAsync(); if (NodeA is not null) await NodeA.DisposeAsync(); if (_sqlConnString is not null) { try { await DropSqlDatabaseAsync(_sqlConnString); } catch { /* best-effort cleanup */ } } } /// Captures the env-var driven harness mode at construction time. public sealed record HarnessMode(bool UseSqlServer, bool UseRealLdap) { public static HarnessMode FromEnvironment() => new( UseSqlServer: Environment.GetEnvironmentVariable("OTOPCUA_HARNESS_USE_SQL") == "1", UseRealLdap: Environment.GetEnvironmentVariable("OTOPCUA_HARNESS_USE_LDAP") == "1"); } private sealed class StubLdapAuthService : ILdapAuthService { public Task AuthenticateAsync(string username, string password, CancellationToken ct = default) => Task.FromResult(new LdapAuthResult( Success: password == "valid-password", DisplayName: username, Username: username, Groups: ["FleetAdmin"], Roles: ["FleetAdmin"], Error: null)); } }