feat(cluster): ClusterRoleInfo wraps Akka.Cluster for app-facing role queries

This commit is contained in:
Joseph Doherty
2026-05-26 04:31:07 -04:00
parent dfb06368cd
commit c217c49f69

View File

@@ -0,0 +1,182 @@
using Akka.Actor;
using Akka.Cluster;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
namespace ZB.MOM.WW.OtOpcUa.Cluster;
/// <summary>
/// Thread-safe live view of cluster membership and role topology. Subscribes to
/// <see cref="ClusterEvent.IMemberEvent"/>, <see cref="ClusterEvent.RoleLeaderChanged"/>, and
/// <see cref="ClusterEvent.LeaderChanged"/> through an internal subscriber actor and keeps
/// a snapshot of role-to-members + role-to-leader. The CLR-facing event surface is
/// <see cref="IClusterRoleInfo.RoleLeaderChanged"/>.
/// </summary>
public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable
{
private readonly Akka.Cluster.Cluster _cluster;
private readonly ILogger<ClusterRoleInfo> _logger;
private readonly CommonsNodeId _localNode;
private readonly HashSet<string> _localRoles;
private readonly object _lock = new();
private readonly Dictionary<string, Member?> _roleLeaders = new(StringComparer.Ordinal);
private readonly Dictionary<string, HashSet<Member>> _membersByRole = new(StringComparer.Ordinal);
private IActorRef? _subscriber;
public ClusterRoleInfo(ActorSystem system, IOptions<AkkaClusterOptions> options, ILogger<ClusterRoleInfo> logger)
{
_cluster = Akka.Cluster.Cluster.Get(system);
_logger = logger;
_localNode = CommonsNodeId.Parse(options.Value.PublicHostname);
_localRoles = new HashSet<string>(options.Value.Roles, StringComparer.Ordinal);
SeedFromCurrentState();
_subscriber = system.ActorOf(Props.Create(() => new SubscriberActor(this)), "clusterroleinfo-subscriber");
}
public CommonsNodeId LocalNode => _localNode;
public IReadOnlySet<string> LocalRoles => _localRoles;
public bool HasRole(string role) => _localRoles.Contains(role);
public IReadOnlyList<CommonsNodeId> MembersWithRole(string role)
{
lock (_lock)
{
if (!_membersByRole.TryGetValue(role, out var members)) return Array.Empty<CommonsNodeId>();
return members
.Select(m => CommonsNodeId.Parse(m.Address.Host ?? string.Empty))
.ToArray();
}
}
public CommonsNodeId? RoleLeader(string role)
{
lock (_lock)
{
return _roleLeaders.TryGetValue(role, out var leader) && leader is not null
? CommonsNodeId.Parse(leader.Address.Host ?? string.Empty)
: (CommonsNodeId?)null;
}
}
public event EventHandler<RoleLeaderChangedEventArgs>? RoleLeaderChanged;
private void SeedFromCurrentState()
{
var snapshot = _cluster.State;
lock (_lock)
{
foreach (var member in snapshot.Members)
foreach (var role in member.Roles)
{
if (!_membersByRole.TryGetValue(role, out var set))
_membersByRole[role] = set = new HashSet<Member>();
set.Add(member);
}
foreach (var role in snapshot.Members.SelectMany(m => m.Roles).Distinct())
{
var leaderAddr = _cluster.State.RoleLeader(role);
_roleLeaders[role] = leaderAddr is not null
? snapshot.Members.FirstOrDefault(m => m.Address == leaderAddr)
: null;
}
}
}
internal void HandleMemberEvent(ClusterEvent.IMemberEvent evt)
{
lock (_lock)
{
switch (evt)
{
case ClusterEvent.MemberUp up:
foreach (var role in up.Member.Roles)
{
if (!_membersByRole.TryGetValue(role, out var set))
_membersByRole[role] = set = new HashSet<Member>();
set.Add(up.Member);
}
break;
case ClusterEvent.MemberRemoved removed:
foreach (var role in removed.Member.Roles)
if (_membersByRole.TryGetValue(role, out var set))
set.Remove(removed.Member);
break;
}
}
}
internal void HandleRoleLeaderChanged(ClusterEvent.RoleLeaderChanged evt)
{
CommonsNodeId? previous = null;
CommonsNodeId? next = null;
var raise = false;
lock (_lock)
{
_roleLeaders.TryGetValue(evt.Role, out var prevMember);
if (prevMember is not null)
previous = CommonsNodeId.Parse(prevMember.Address.Host ?? string.Empty);
var nextMember = evt.Leader is null
? null
: _cluster.State.Members.FirstOrDefault(m => m.Address == evt.Leader);
_roleLeaders[evt.Role] = nextMember;
if (nextMember is not null)
next = CommonsNodeId.Parse(nextMember.Address.Host ?? string.Empty);
raise = !Nullable.Equals(previous, next);
}
if (!raise) return;
try
{
RoleLeaderChanged?.Invoke(this, new RoleLeaderChangedEventArgs
{
Role = evt.Role,
PreviousLeader = previous,
NewLeader = next,
});
}
catch (Exception ex)
{
_logger.LogWarning(ex, "RoleLeaderChanged subscriber threw for role {Role}", evt.Role);
}
}
public void Dispose()
{
_subscriber?.Tell(PoisonPill.Instance);
_subscriber = null;
}
private sealed class SubscriberActor : ReceiveActor
{
public SubscriberActor(ClusterRoleInfo owner)
{
Receive<ClusterEvent.IMemberEvent>(e => owner.HandleMemberEvent(e));
Receive<ClusterEvent.RoleLeaderChanged>(e => owner.HandleRoleLeaderChanged(e));
Receive<ClusterEvent.LeaderChanged>(_ => { /* no-op for now; reserved for ServiceLevel calc */ });
Receive<ClusterEvent.CurrentClusterState>(_ => { /* seeded from initial snapshot */ });
}
protected override void PreStart()
{
Akka.Cluster.Cluster.Get(Context.System).Subscribe(
Self,
ClusterEvent.InitialStateAsEvents,
typeof(ClusterEvent.IMemberEvent),
typeof(ClusterEvent.LeaderChanged),
typeof(ClusterEvent.RoleLeaderChanged));
}
protected override void PostStop() =>
Akka.Cluster.Cluster.Get(Context.System).Unsubscribe(Self);
}
}