feat(cluster): AkkaHostedService and DI extension

This commit is contained in:
Joseph Doherty
2026-05-26 04:31:05 -04:00
parent 3d0f4dc168
commit f184f8ed1b
3 changed files with 151 additions and 0 deletions

View File

@@ -0,0 +1,26 @@
namespace ZB.MOM.WW.OtOpcUa.Cluster;
public sealed class AkkaClusterOptions
{
public const string SectionName = "Cluster";
public string SystemName { get; set; } = "otopcua";
public string Hostname { get; set; } = "0.0.0.0";
public int Port { get; set; } = 4053;
/// <summary>
/// Hostname advertised in cluster gossip. Must be reachable by other nodes.
/// In docker-compose this is the container DNS name; in bare metal it's the
/// host's stable LAN address.
/// </summary>
public string PublicHostname { get; set; } = "127.0.0.1";
public string[] SeedNodes { get; set; } = Array.Empty<string>();
/// <summary>
/// Cluster roles for this node. When empty the role list comes from
/// <c>OTOPCUA_ROLES</c> via <see cref="RoleParser"/>. Allowed values:
/// <c>admin</c>, <c>driver</c>, <c>dev</c>.
/// </summary>
public string[] Roles { get; set; } = Array.Empty<string>();
}

View File

@@ -0,0 +1,97 @@
using Akka.Actor;
using Akka.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ZB.MOM.WW.OtOpcUa.Cluster;
/// <summary>
/// Starts the local <see cref="ActorSystem"/>, applies the embedded HOCON plus an overlay
/// generated from <see cref="AkkaClusterOptions"/>, and joins the cluster. On shutdown,
/// runs <c>CoordinatedShutdown</c> with the <c>ClusterLeavingReason</c> so the local node
/// leaves the cluster cleanly before the process exits.
/// </summary>
public sealed class AkkaHostedService : IHostedService
{
private readonly AkkaClusterOptions _options;
private readonly ILogger<AkkaHostedService> _logger;
private ActorSystem? _actorSystem;
public AkkaHostedService(IOptions<AkkaClusterOptions> options, ILogger<AkkaHostedService> logger)
{
_options = options.Value;
_logger = logger;
}
public ActorSystem ActorSystem =>
_actorSystem ?? throw new InvalidOperationException(
"ActorSystem requested before AkkaHostedService.StartAsync ran.");
public Task StartAsync(CancellationToken cancellationToken)
{
var overlay = BuildOverlay(_options);
var baseConfig = ConfigurationFactory.ParseString(HoconLoader.LoadBaseConfig());
var config = ConfigurationFactory.ParseString(overlay).WithFallback(baseConfig);
_logger.LogInformation(
"Starting ActorSystem '{System}' on {Host}:{Port} with roles=[{Roles}]",
_options.SystemName, _options.PublicHostname, _options.Port,
string.Join(",", _options.Roles));
_actorSystem = ActorSystem.Create(_options.SystemName, config);
if (_options.SeedNodes.Length > 0)
{
var seeds = _options.SeedNodes.Select(Address.Parse).ToList();
Akka.Cluster.Cluster.Get(_actorSystem).JoinSeedNodes(seeds);
}
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_actorSystem is null) return;
_logger.LogInformation("Initiating cluster-leave CoordinatedShutdown");
var shutdown = CoordinatedShutdown.Get(_actorSystem);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromSeconds(30));
try
{
await shutdown.Run(CoordinatedShutdown.ClusterLeavingReason.Instance)
.WaitAsync(cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_logger.LogWarning("Cluster leave timed out after 30s; forcing terminate");
await _actorSystem.Terminate().ConfigureAwait(false);
}
}
private static string BuildOverlay(AkkaClusterOptions o)
{
var seeds = string.Join(",", o.SeedNodes.Select(Quote));
var roles = string.Join(",", o.Roles.Select(Quote));
return $@"
akka {{
remote.dot-netty.tcp {{
hostname = {Quote(o.Hostname)}
port = {o.Port}
public-hostname = {Quote(o.PublicHostname)}
}}
cluster {{
seed-nodes = [{seeds}]
roles = [{roles}]
}}
}}";
}
private static string Quote(string? value)
{
var escaped = (value ?? string.Empty).Replace("\\", "\\\\").Replace("\"", "\\\"");
return $"\"{escaped}\"";
}
}

View File

@@ -0,0 +1,28 @@
using Akka.Actor;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
namespace ZB.MOM.WW.OtOpcUa.Cluster;
public static class ServiceCollectionExtensions
{
/// <summary>
/// Registers the Akka cluster hosted service and exposes <see cref="ActorSystem"/> and
/// <see cref="IClusterRoleInfo"/> as singletons resolved from it. Call after binding
/// <c>OTOPCUA_ROLES</c> into <c>AkkaClusterOptions.Roles</c> via the calling Program.cs.
/// </summary>
public static IServiceCollection AddOtOpcUaCluster(this IServiceCollection services, IConfiguration configuration)
{
services.AddOptions<AkkaClusterOptions>()
.Bind(configuration.GetSection(AkkaClusterOptions.SectionName));
services.AddSingleton<AkkaHostedService>();
services.AddHostedService(sp => sp.GetRequiredService<AkkaHostedService>());
services.AddSingleton<ActorSystem>(sp => sp.GetRequiredService<AkkaHostedService>().ActorSystem);
services.AddSingleton<IClusterRoleInfo, ClusterRoleInfo>();
return services;
}
}