From f184f8ed1bd9b26ade01591fb87b85a36b33e2c0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 26 May 2026 04:31:05 -0400 Subject: [PATCH] feat(cluster): AkkaHostedService and DI extension --- .../AkkaClusterOptions.cs | 26 +++++ .../AkkaHostedService.cs | 97 +++++++++++++++++++ .../ServiceCollectionExtensions.cs | 28 ++++++ 3 files changed, 151 insertions(+) create mode 100644 src/Core/ZB.MOM.WW.OtOpcUa.Cluster/AkkaClusterOptions.cs create mode 100644 src/Core/ZB.MOM.WW.OtOpcUa.Cluster/AkkaHostedService.cs create mode 100644 src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ServiceCollectionExtensions.cs diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/AkkaClusterOptions.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/AkkaClusterOptions.cs new file mode 100644 index 0000000..89a7cc7 --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/AkkaClusterOptions.cs @@ -0,0 +1,26 @@ +namespace ZB.MOM.WW.OtOpcUa.Cluster; + +public sealed class AkkaClusterOptions +{ + public const string SectionName = "Cluster"; + + public string SystemName { get; set; } = "otopcua"; + public string Hostname { get; set; } = "0.0.0.0"; + public int Port { get; set; } = 4053; + + /// + /// Hostname advertised in cluster gossip. Must be reachable by other nodes. + /// In docker-compose this is the container DNS name; in bare metal it's the + /// host's stable LAN address. + /// + public string PublicHostname { get; set; } = "127.0.0.1"; + + public string[] SeedNodes { get; set; } = Array.Empty(); + + /// + /// Cluster roles for this node. When empty the role list comes from + /// OTOPCUA_ROLES via . Allowed values: + /// admin, driver, dev. + /// + public string[] Roles { get; set; } = Array.Empty(); +} diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/AkkaHostedService.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/AkkaHostedService.cs new file mode 100644 index 0000000..86e359f --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/AkkaHostedService.cs @@ -0,0 +1,97 @@ +using Akka.Actor; +using Akka.Configuration; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace ZB.MOM.WW.OtOpcUa.Cluster; + +/// +/// Starts the local , applies the embedded HOCON plus an overlay +/// generated from , and joins the cluster. On shutdown, +/// runs CoordinatedShutdown with the ClusterLeavingReason so the local node +/// leaves the cluster cleanly before the process exits. +/// +public sealed class AkkaHostedService : IHostedService +{ + private readonly AkkaClusterOptions _options; + private readonly ILogger _logger; + private ActorSystem? _actorSystem; + + public AkkaHostedService(IOptions options, ILogger logger) + { + _options = options.Value; + _logger = logger; + } + + public ActorSystem ActorSystem => + _actorSystem ?? throw new InvalidOperationException( + "ActorSystem requested before AkkaHostedService.StartAsync ran."); + + public Task StartAsync(CancellationToken cancellationToken) + { + var overlay = BuildOverlay(_options); + var baseConfig = ConfigurationFactory.ParseString(HoconLoader.LoadBaseConfig()); + var config = ConfigurationFactory.ParseString(overlay).WithFallback(baseConfig); + + _logger.LogInformation( + "Starting ActorSystem '{System}' on {Host}:{Port} with roles=[{Roles}]", + _options.SystemName, _options.PublicHostname, _options.Port, + string.Join(",", _options.Roles)); + + _actorSystem = ActorSystem.Create(_options.SystemName, config); + + if (_options.SeedNodes.Length > 0) + { + var seeds = _options.SeedNodes.Select(Address.Parse).ToList(); + Akka.Cluster.Cluster.Get(_actorSystem).JoinSeedNodes(seeds); + } + + return Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + if (_actorSystem is null) return; + + _logger.LogInformation("Initiating cluster-leave CoordinatedShutdown"); + var shutdown = CoordinatedShutdown.Get(_actorSystem); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(TimeSpan.FromSeconds(30)); + + try + { + await shutdown.Run(CoordinatedShutdown.ClusterLeavingReason.Instance) + .WaitAsync(cts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + _logger.LogWarning("Cluster leave timed out after 30s; forcing terminate"); + await _actorSystem.Terminate().ConfigureAwait(false); + } + } + + private static string BuildOverlay(AkkaClusterOptions o) + { + var seeds = string.Join(",", o.SeedNodes.Select(Quote)); + var roles = string.Join(",", o.Roles.Select(Quote)); + return $@" +akka {{ + remote.dot-netty.tcp {{ + hostname = {Quote(o.Hostname)} + port = {o.Port} + public-hostname = {Quote(o.PublicHostname)} + }} + cluster {{ + seed-nodes = [{seeds}] + roles = [{roles}] + }} +}}"; + } + + private static string Quote(string? value) + { + var escaped = (value ?? string.Empty).Replace("\\", "\\\\").Replace("\"", "\\\""); + return $"\"{escaped}\""; + } +} diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ServiceCollectionExtensions.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..7d22e4b --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ServiceCollectionExtensions.cs @@ -0,0 +1,28 @@ +using Akka.Actor; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; + +namespace ZB.MOM.WW.OtOpcUa.Cluster; + +public static class ServiceCollectionExtensions +{ + /// + /// Registers the Akka cluster hosted service and exposes and + /// as singletons resolved from it. Call after binding + /// OTOPCUA_ROLES into AkkaClusterOptions.Roles via the calling Program.cs. + /// + public static IServiceCollection AddOtOpcUaCluster(this IServiceCollection services, IConfiguration configuration) + { + services.AddOptions() + .Bind(configuration.GetSection(AkkaClusterOptions.SectionName)); + + services.AddSingleton(); + services.AddHostedService(sp => sp.GetRequiredService()); + services.AddSingleton(sp => sp.GetRequiredService().ActorSystem); + services.AddSingleton(); + + return services; + } +}