diff --git a/ZB.MOM.WW.OtOpcUa.slnx b/ZB.MOM.WW.OtOpcUa.slnx index 2da7dc3..124972d 100644 --- a/ZB.MOM.WW.OtOpcUa.slnx +++ b/ZB.MOM.WW.OtOpcUa.slnx @@ -62,6 +62,7 @@ + diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/AkkaHostedService.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/AkkaHostedService.cs deleted file mode 100644 index 86e359f..0000000 --- a/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/AkkaHostedService.cs +++ /dev/null @@ -1,97 +0,0 @@ -using Akka.Actor; -using Akka.Configuration; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -namespace ZB.MOM.WW.OtOpcUa.Cluster; - -/// -/// Starts the local , applies the embedded HOCON plus an overlay -/// generated from , and joins the cluster. On shutdown, -/// runs CoordinatedShutdown with the ClusterLeavingReason so the local node -/// leaves the cluster cleanly before the process exits. -/// -public sealed class AkkaHostedService : IHostedService -{ - private readonly AkkaClusterOptions _options; - private readonly ILogger _logger; - private ActorSystem? _actorSystem; - - public AkkaHostedService(IOptions options, ILogger logger) - { - _options = options.Value; - _logger = logger; - } - - public ActorSystem ActorSystem => - _actorSystem ?? throw new InvalidOperationException( - "ActorSystem requested before AkkaHostedService.StartAsync ran."); - - public Task StartAsync(CancellationToken cancellationToken) - { - var overlay = BuildOverlay(_options); - var baseConfig = ConfigurationFactory.ParseString(HoconLoader.LoadBaseConfig()); - var config = ConfigurationFactory.ParseString(overlay).WithFallback(baseConfig); - - _logger.LogInformation( - "Starting ActorSystem '{System}' on {Host}:{Port} with roles=[{Roles}]", - _options.SystemName, _options.PublicHostname, _options.Port, - string.Join(",", _options.Roles)); - - _actorSystem = ActorSystem.Create(_options.SystemName, config); - - if (_options.SeedNodes.Length > 0) - { - var seeds = _options.SeedNodes.Select(Address.Parse).ToList(); - Akka.Cluster.Cluster.Get(_actorSystem).JoinSeedNodes(seeds); - } - - return Task.CompletedTask; - } - - public async Task StopAsync(CancellationToken cancellationToken) - { - if (_actorSystem is null) return; - - _logger.LogInformation("Initiating cluster-leave CoordinatedShutdown"); - var shutdown = CoordinatedShutdown.Get(_actorSystem); - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(TimeSpan.FromSeconds(30)); - - try - { - await shutdown.Run(CoordinatedShutdown.ClusterLeavingReason.Instance) - .WaitAsync(cts.Token).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - _logger.LogWarning("Cluster leave timed out after 30s; forcing terminate"); - await _actorSystem.Terminate().ConfigureAwait(false); - } - } - - private static string BuildOverlay(AkkaClusterOptions o) - { - var seeds = string.Join(",", o.SeedNodes.Select(Quote)); - var roles = string.Join(",", o.Roles.Select(Quote)); - return $@" -akka {{ - remote.dot-netty.tcp {{ - hostname = {Quote(o.Hostname)} - port = {o.Port} - public-hostname = {Quote(o.PublicHostname)} - }} - cluster {{ - seed-nodes = [{seeds}] - roles = [{roles}] - }} -}}"; - } - - private static string Quote(string? value) - { - var escaped = (value ?? string.Empty).Replace("\\", "\\\\").Replace("\"", "\\\""); - return $"\"{escaped}\""; - } -} diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ServiceCollectionExtensions.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ServiceCollectionExtensions.cs index 7d22e4b..eb743d0 100644 --- a/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ServiceCollectionExtensions.cs +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ServiceCollectionExtensions.cs @@ -1,7 +1,9 @@ -using Akka.Actor; +using Akka.Cluster.Hosting; +using Akka.Hosting; +using Akka.Remote.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Options; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; namespace ZB.MOM.WW.OtOpcUa.Cluster; @@ -9,20 +11,57 @@ namespace ZB.MOM.WW.OtOpcUa.Cluster; public static class ServiceCollectionExtensions { /// - /// Registers the Akka cluster hosted service and exposes and - /// as singletons resolved from it. Call after binding - /// OTOPCUA_ROLES into AkkaClusterOptions.Roles via the calling Program.cs. + /// Binds and registers . The + /// actual ActorSystem + cluster bootstrap is layered on inside the host's AddAkka(...) + /// configurator via — keeping the entire Akka graph + /// under Akka.Hosting's management so cluster singletons land on the same ActorSystem. /// public static IServiceCollection AddOtOpcUaCluster(this IServiceCollection services, IConfiguration configuration) { services.AddOptions() .Bind(configuration.GetSection(AkkaClusterOptions.SectionName)); - services.AddSingleton(); - services.AddHostedService(sp => sp.GetRequiredService()); - services.AddSingleton(sp => sp.GetRequiredService().ActorSystem); services.AddSingleton(); return services; } + + /// + /// Configures the Akka.Hosting builder with the embedded OtOpcUa HOCON (split-brain resolver, + /// pinned dispatcher, failure detector tuning) + remote endpoint + cluster bootstrap derived + /// from . + /// + /// Wire from Program.cs: + /// + /// services.AddAkka("otopcua", (ab, sp) => + /// { + /// ab.WithOtOpcUaClusterBootstrap(sp); + /// if (hasAdmin) ab.WithOtOpcUaControlPlaneSingletons(); + /// if (hasDriver) ab.WithOtOpcUaRuntimeActors(); + /// }); + /// + /// + public static AkkaConfigurationBuilder WithOtOpcUaClusterBootstrap( + this AkkaConfigurationBuilder builder, + IServiceProvider serviceProvider) + { + var options = serviceProvider.GetRequiredService>().Value; + + builder.AddHocon(HoconLoader.LoadBaseConfig(), HoconAddMode.Append); + + builder.WithRemoting(new RemoteOptions + { + HostName = options.Hostname, + Port = options.Port, + PublicHostName = options.PublicHostname, + }); + + builder.WithClustering(new ClusterOptions + { + SeedNodes = options.SeedNodes, + Roles = options.Roles, + }); + + return builder; + } } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs index a7e0342..a655b9f 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs @@ -40,8 +40,9 @@ builder.Services.AddOtOpcUaCluster(builder.Configuration); // Akka cluster bootstrap. Role-specific singletons are registered on the AkkaConfigurationBuilder // from inside the configurator lambda. AddAkka spins the ActorSystem at host start. -builder.Services.AddAkka("otopcua", (ab, _) => +builder.Services.AddAkka("otopcua", (ab, sp) => { + ab.WithOtOpcUaClusterBootstrap(sp); if (hasAdmin) ab.WithOtOpcUaControlPlaneSingletons(); if (hasDriver) diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/ClusterFormationTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/ClusterFormationTests.cs new file mode 100644 index 0000000..05ebff3 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/ClusterFormationTests.cs @@ -0,0 +1,43 @@ +using Akka.Cluster; +using Shouldly; +using Xunit; + +namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; + +/// +/// Smoke test: verifies boots two nodes and they form +/// a 2-member cluster with the expected role topology. Failover + deploy scenarios layer +/// on top in Task 59. +/// +public sealed class ClusterFormationTests +{ + [Fact] + public async Task Two_nodes_form_a_2_member_cluster() + { + await using var harness = await TwoNodeClusterHarness.StartAsync(); + + var aCluster = Akka.Cluster.Cluster.Get(harness.NodeASystem); + var bCluster = Akka.Cluster.Cluster.Get(harness.NodeBSystem); + + aCluster.State.Members.Count(m => m.Status == MemberStatus.Up).ShouldBe(2); + bCluster.State.Members.Count(m => m.Status == MemberStatus.Up).ShouldBe(2); + + var aRoles = aCluster.State.Members.SelectMany(m => m.Roles).Distinct().ToHashSet(); + aRoles.ShouldContain("admin"); + aRoles.ShouldContain("driver"); + } + + [Fact] + public async Task Both_nodes_see_each_other_as_role_members() + { + await using var harness = await TwoNodeClusterHarness.StartAsync(); + + var aCluster = Akka.Cluster.Cluster.Get(harness.NodeASystem); + aCluster.State.Members + .Where(m => m.Roles.Contains("driver") && m.Status == MemberStatus.Up) + .Count().ShouldBe(2); + aCluster.State.Members + .Where(m => m.Roles.Contains("admin") && m.Status == MemberStatus.Up) + .Count().ShouldBe(2); + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/TwoNodeClusterHarness.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/TwoNodeClusterHarness.cs new file mode 100644 index 0000000..847c03d --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/TwoNodeClusterHarness.cs @@ -0,0 +1,176 @@ +using System.Net.Sockets; +using Akka.Actor; +using Akka.Cluster; +using Akka.Hosting; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using ZB.MOM.WW.OtOpcUa.AdminUI; +using ZB.MOM.WW.OtOpcUa.AdminUI.Clients; +using ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; +using ZB.MOM.WW.OtOpcUa.Cluster; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.ControlPlane; +using ZB.MOM.WW.OtOpcUa.Host.Health; +using ZB.MOM.WW.OtOpcUa.Runtime; +using ZB.MOM.WW.OtOpcUa.Security; +using ZB.MOM.WW.OtOpcUa.Security.Endpoints; +using ZB.MOM.WW.OtOpcUa.Security.Ldap; + +namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; + +/// +/// Spins up two in-process OtOpcUa.Host-equivalent instances +/// that share an in-memory and form a 2-member Akka cluster. +/// Both nodes carry the admin + driver roles, matching design §8's failover-test +/// 2-node profile. +/// +/// Why not WebApplicationFactory<Program>? +/// Program.cs reads OTOPCUA_ROLES from process env (shared across in-process WAF +/// instances) and writes both Serilog file sinks + Akka cluster TCP listener to the host +/// process — neither survives two parallel WAFs cleanly. This harness instead replays the +/// Program.cs DI graph from a clean per node with +/// per-node config overrides. The production wiring is the same set of extensions +/// (, +/// , +/// , , +/// , +/// ). +/// +public sealed class TwoNodeClusterHarness : IAsyncDisposable +{ + public const string TestRoles = "admin,driver"; + public static readonly string SharedDbName = $"two-node-cluster-{Guid.NewGuid():N}"; + + public WebApplication NodeA { get; private set; } = null!; + public WebApplication NodeB { get; private set; } = null!; + + public int NodeAAkkaPort { get; private set; } + public int NodeBAkkaPort { get; private set; } + + public ActorSystem NodeASystem => NodeA.Services.GetRequiredService(); + public ActorSystem NodeBSystem => NodeB.Services.GetRequiredService(); + + /// Boots both nodes and waits up to for cluster convergence. + public static async Task StartAsync(TimeSpan? formationTimeout = null) + { + var harness = new TwoNodeClusterHarness(); + harness.NodeAAkkaPort = AllocateFreePort(); + harness.NodeBAkkaPort = AllocateFreePort(); + + // Node A boots first as the seed. + harness.NodeA = await BuildNodeAsync( + akkaPort: harness.NodeAAkkaPort, + seedAkkaPort: harness.NodeAAkkaPort, + dbName: SharedDbName); + + harness.NodeB = await BuildNodeAsync( + akkaPort: harness.NodeBAkkaPort, + seedAkkaPort: harness.NodeAAkkaPort, + dbName: SharedDbName); + + await WaitForClusterFormationAsync( + harness.NodeASystem, + harness.NodeBSystem, + formationTimeout ?? TimeSpan.FromSeconds(20)); + + return harness; + } + + private static async Task BuildNodeAsync(int akkaPort, int seedAkkaPort, string dbName) + { + var builder = WebApplication.CreateBuilder(new WebApplicationOptions { Args = [] }); + + builder.WebHost.UseKestrel(o => o.Listen(System.Net.IPAddress.Loopback, 0)); + builder.Configuration.AddInMemoryCollection(new Dictionary + { + ["ConnectionStrings:ConfigDb"] = "Server=test;Database=test;Trusted_Connection=True;TrustServerCertificate=True;", + ["Cluster:Hostname"] = "127.0.0.1", + ["Cluster:Port"] = akkaPort.ToString(), + ["Cluster:PublicHostname"] = "127.0.0.1", + ["Cluster:SeedNodes:0"] = $"akka.tcp://otopcua@127.0.0.1:{seedAkkaPort}", + ["Cluster:Roles:0"] = "admin", + ["Cluster:Roles:1"] = "driver", + ["Security:Jwt:SigningKey"] = "two-node-harness-test-signing-key-with-enough-bytes-for-hs256", + ["Security:Jwt:Issuer"] = "otopcua-test", + ["Security:Jwt:Audience"] = "otopcua-test", + }); + + // Replicate Program.cs role wiring with the harness-shared in-memory ConfigDb. + builder.Services.AddDbContextFactory(opt => opt.UseInMemoryDatabase(dbName)); + builder.Services.AddDbContext(opt => opt.UseInMemoryDatabase(dbName)); + builder.Services.AddOtOpcUaCluster(builder.Configuration); + + builder.Services.AddAkka("otopcua", (ab, sp) => + { + ab.WithOtOpcUaClusterBootstrap(sp); + ab.WithOtOpcUaControlPlaneSingletons(); + ab.WithOtOpcUaRuntimeActors(); + }); + + builder.Services.AddOtOpcUaAuth(builder.Configuration); + builder.Services.AddSingleton(); + builder.Services.AddAdminUI(); + builder.Services.AddSignalR(); + builder.Services.AddOtOpcUaAdminClients(); + builder.Services.AddOtOpcUaHealth(); + + var app = builder.Build(); + app.UseAuthentication(); + app.UseAuthorization(); + app.MapOtOpcUaAuth(); + app.MapOtOpcUaHubs(); + app.MapOtOpcUaHealth(); + + await app.StartAsync(); + return app; + } + + private static async Task WaitForClusterFormationAsync(ActorSystem a, ActorSystem b, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (DateTime.UtcNow < deadline) + { + var aMembers = Akka.Cluster.Cluster.Get(a).State.Members + .Where(m => m.Status == MemberStatus.Up).ToArray(); + var bMembers = Akka.Cluster.Cluster.Get(b).State.Members + .Where(m => m.Status == MemberStatus.Up).ToArray(); + if (aMembers.Length >= 2 && bMembers.Length >= 2) return; + await Task.Delay(200); + } + throw new TimeoutException( + $"Cluster did not form within {timeout}. " + + $"A up={Akka.Cluster.Cluster.Get(a).State.Members.Count(m => m.Status == MemberStatus.Up)}, " + + $"B up={Akka.Cluster.Cluster.Get(b).State.Members.Count(m => m.Status == MemberStatus.Up)}"); + } + + private static int AllocateFreePort() + { + using var listener = new TcpListener(System.Net.IPAddress.Loopback, 0); + listener.Start(); + var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port; + listener.Stop(); + return port; + } + + public async ValueTask DisposeAsync() + { + if (NodeB is not null) await NodeB.DisposeAsync(); + if (NodeA is not null) await NodeA.DisposeAsync(); + } + + private sealed class StubLdapAuthService : ILdapAuthService + { + public Task AuthenticateAsync(string username, string password, CancellationToken ct = default) + => Task.FromResult(new LdapAuthResult( + Success: password == "valid-password", + DisplayName: username, + Username: username, + Groups: ["FleetAdmin"], + Roles: ["FleetAdmin"], + Error: null)); + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests.csproj b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests.csproj new file mode 100644 index 0000000..06bb70b --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests.csproj @@ -0,0 +1,36 @@ + + + + false + true + ZB.MOM.WW.OtOpcUa.Host.IntegrationTests + true + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + + + + +